mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-03 16:30:38 +00:00
Compare commits
44 Commits
sergey/und
...
gm/sql_ove
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6abe34bfa1 | ||
|
|
b932c14f94 | ||
|
|
2e05a9b652 | ||
|
|
b808160aff | ||
|
|
2997018005 | ||
|
|
8f98cc29fa | ||
|
|
b4ee8e3b73 | ||
|
|
68ea916bf1 | ||
|
|
eea3dd54da | ||
|
|
4071b22519 | ||
|
|
c58edec63a | ||
|
|
fdb9d4373d | ||
|
|
cb88df7ffa | ||
|
|
595248532c | ||
|
|
158c051c9a | ||
|
|
143c4954df | ||
|
|
373ae7672b | ||
|
|
1233923c3a | ||
|
|
1a3a7f14dd | ||
|
|
4e9067a8c2 | ||
|
|
ebea298415 | ||
|
|
5ffa20dd82 | ||
|
|
75ea8106ec | ||
|
|
017d3a390d | ||
|
|
589cf1ed21 | ||
|
|
0c82ff3d98 | ||
|
|
8895f28dae | ||
|
|
b6c7c3290f | ||
|
|
fd31fafeee | ||
|
|
db8dd6f380 | ||
|
|
36c20946b4 | ||
|
|
89b5589b1b | ||
|
|
53f438a8a8 | ||
|
|
356439aa33 | ||
|
|
c237a2f5fb | ||
|
|
15d1f85552 | ||
|
|
732acc54c1 | ||
|
|
5d0ecadf7c | ||
|
|
f7995b3c70 | ||
|
|
13e53e5dc8 | ||
|
|
c94b8998be | ||
|
|
218062ceba | ||
|
|
8d295780cb | ||
|
|
a64044a7a9 |
@@ -4,7 +4,7 @@
|
||||
hakari-package = "workspace_hack"
|
||||
|
||||
# Format for `workspace-hack = ...` lines in other Cargo.tomls. Requires cargo-hakari 0.9.8 or above.
|
||||
dep-format-version = "3"
|
||||
dep-format-version = "4"
|
||||
|
||||
# Setting workspace.resolver = "2" in the root Cargo.toml is HIGHLY recommended.
|
||||
# Hakari works much better with the new feature resolver.
|
||||
|
||||
1
.github/PULL_REQUEST_TEMPLATE/release-pr.md
vendored
1
.github/PULL_REQUEST_TEMPLATE/release-pr.md
vendored
@@ -10,6 +10,7 @@
|
||||
<!-- List everything that should be done **before** release, any issues / setting changes / etc -->
|
||||
|
||||
### Checklist after release
|
||||
- [ ] Make sure instructions from PRs included in this release and labeled `manual_release_instructions` are executed (either by you or by people who wrote them).
|
||||
- [ ] Based on the merged commits write release notes and open a PR into `website` repo ([example](https://github.com/neondatabase/website/pull/219/files))
|
||||
- [ ] Check [#dev-production-stream](https://neondb.slack.com/archives/C03F5SM1N02) Slack channel
|
||||
- [ ] Check [stuck projects page](https://console.neon.tech/admin/projects?sort=last_active&order=desc&stuck=true)
|
||||
|
||||
4
.github/actions/allure-report/action.yml
vendored
4
.github/actions/allure-report/action.yml
vendored
@@ -45,12 +45,12 @@ runs:
|
||||
shell: bash -euxo pipefail {0}
|
||||
run: |
|
||||
if [ "${{ inputs.action }}" != "store" ] && [ "${{ inputs.action }}" != "generate" ]; then
|
||||
echo 2>&1 "Unknown inputs.action type '${{ inputs.action }}'; allowed 'generate' or 'store' only"
|
||||
echo >&2 "Unknown inputs.action type '${{ inputs.action }}'; allowed 'generate' or 'store' only"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -z "${{ inputs.test_selection }}" ] && [ "${{ inputs.action }}" == "store" ]; then
|
||||
echo 2>&1 "inputs.test_selection must be set for 'store' action"
|
||||
echo >&2 "inputs.test_selection must be set for 'store' action"
|
||||
exit 2
|
||||
fi
|
||||
|
||||
|
||||
2
.github/actions/download/action.yml
vendored
2
.github/actions/download/action.yml
vendored
@@ -37,7 +37,7 @@ runs:
|
||||
echo 'SKIPPED=true' >> $GITHUB_OUTPUT
|
||||
exit 0
|
||||
else
|
||||
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
|
||||
echo >&2 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
@@ -58,7 +58,7 @@ runs:
|
||||
done
|
||||
|
||||
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
|
||||
echo 2>&1 "Failed to create branch after 10 attempts, the latest response was: ${branch}"
|
||||
echo >&2 "Failed to create branch after 10 attempts, the latest response was: ${branch}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -122,7 +122,7 @@ runs:
|
||||
done
|
||||
|
||||
if [ -z "${password}" ] || [ "${password}" == "null" ]; then
|
||||
echo 2>&1 "Failed to reset password after 10 attempts, the latest response was: ${reset_password}"
|
||||
echo >&2 "Failed to reset password after 10 attempts, the latest response was: ${reset_password}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ runs:
|
||||
done
|
||||
|
||||
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
|
||||
echo 2>&1 "Failed to delete branch after 10 attempts, the latest response was: ${deleted_branch}"
|
||||
echo >&2 "Failed to delete branch after 10 attempts, the latest response was: ${deleted_branch}"
|
||||
exit 1
|
||||
fi
|
||||
env:
|
||||
|
||||
@@ -202,7 +202,7 @@ runs:
|
||||
prefix: latest
|
||||
|
||||
- name: Create Allure report
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report
|
||||
with:
|
||||
action: store
|
||||
|
||||
6
.github/actions/upload/action.yml
vendored
6
.github/actions/upload/action.yml
vendored
@@ -23,7 +23,7 @@ runs:
|
||||
mkdir -p $(dirname $ARCHIVE)
|
||||
|
||||
if [ -f ${ARCHIVE} ]; then
|
||||
echo 2>&1 "File ${ARCHIVE} already exist. Something went wrong before"
|
||||
echo >&2 "File ${ARCHIVE} already exist. Something went wrong before"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -33,10 +33,10 @@ runs:
|
||||
elif [ -f ${SOURCE} ]; then
|
||||
time tar -cf ${ARCHIVE} --zstd ${SOURCE}
|
||||
elif ! ls ${SOURCE} > /dev/null 2>&1; then
|
||||
echo 2>&1 "${SOURCE} does not exist"
|
||||
echo >&2 "${SOURCE} does not exist"
|
||||
exit 2
|
||||
else
|
||||
echo 2>&1 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
|
||||
echo >&2 "${SOURCE} is neither a directory nor a file, do not know how to handle it"
|
||||
exit 3
|
||||
fi
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
2
.github/ansible/prod.eu-central-1.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
8
.github/ansible/prod.us-west-2.hosts.yaml
vendored
8
.github/ansible/prod.us-west-2.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "10m"
|
||||
threshold: &default_eviction_threshold "24h"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
@@ -34,7 +34,7 @@ storage:
|
||||
pageservers:
|
||||
hosts:
|
||||
pageserver-0.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-0d9f6dfae0e1c780d
|
||||
ansible_host: i-0d9f6dfae0e1c780d
|
||||
pageserver-1.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-0c834be1dddba8b3f
|
||||
pageserver-2.us-west-2.aws.neon.tech:
|
||||
@@ -49,5 +49,5 @@ storage:
|
||||
safekeeper-1.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-074682f9d3c712e7c
|
||||
safekeeper-2.us-west-2.aws.neon.tech:
|
||||
ansible_host: i-042b7efb1729d7966
|
||||
|
||||
ansible_host: i-042b7efb1729d7966
|
||||
|
||||
|
||||
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
2
.github/ansible/staging.eu-west-1.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "20m"
|
||||
threshold: &default_eviction_threshold "20m"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
2
.github/ansible/staging.us-east-2.hosts.yaml
vendored
@@ -17,7 +17,7 @@ storage:
|
||||
kind: "LayerAccessThreshold"
|
||||
period: "20m"
|
||||
threshold: &default_eviction_threshold "20m"
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
evictions_low_residence_duration_metric_threshold: *default_eviction_threshold
|
||||
remote_storage:
|
||||
bucket_name: "{{ bucket_name }}"
|
||||
bucket_region: "{{ bucket_region }}"
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
image:
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -7,15 +7,16 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
@@ -7,13 +7,13 @@ deploymentStrategy:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# Delay the kill signal by 5 minutes (5 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
command: ["/bin/sh", "-c", "sleep 300"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
|
||||
36
.github/workflows/benchmarking.yml
vendored
36
.github/workflows/benchmarking.yml
vendored
@@ -30,7 +30,7 @@ defaults:
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
@@ -42,7 +42,7 @@ jobs:
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: "neon-staging"
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
@@ -92,7 +92,7 @@ jobs:
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report
|
||||
with:
|
||||
action: generate
|
||||
@@ -174,7 +174,7 @@ jobs:
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
@@ -226,7 +226,7 @@ jobs:
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -282,7 +282,7 @@ jobs:
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report
|
||||
with:
|
||||
action: generate
|
||||
@@ -305,7 +305,7 @@ jobs:
|
||||
#
|
||||
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
|
||||
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, pgbench-compare ]
|
||||
|
||||
strategy:
|
||||
@@ -317,7 +317,7 @@ jobs:
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
@@ -356,7 +356,7 @@ jobs:
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -379,7 +379,7 @@ jobs:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report
|
||||
with:
|
||||
action: generate
|
||||
@@ -401,7 +401,7 @@ jobs:
|
||||
# We might change it after https://github.com/neondatabase/neon/issues/2900.
|
||||
#
|
||||
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, clickbench-compare ]
|
||||
|
||||
strategy:
|
||||
@@ -413,7 +413,7 @@ jobs:
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
@@ -452,7 +452,7 @@ jobs:
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_TPCH_S10_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -475,7 +475,7 @@ jobs:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report
|
||||
with:
|
||||
action: generate
|
||||
@@ -491,7 +491,7 @@ jobs:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
user-examples-compare:
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [ generate-matrices, tpch-compare ]
|
||||
|
||||
strategy:
|
||||
@@ -503,7 +503,7 @@ jobs:
|
||||
DEFAULT_PG_VERSION: 14
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
PLATFORM: ${{ matrix.platform }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
@@ -542,7 +542,7 @@ jobs:
|
||||
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -565,7 +565,7 @@ jobs:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: success() || failure()
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report
|
||||
with:
|
||||
action: generate
|
||||
|
||||
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -13,7 +13,7 @@ defaults:
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
@@ -368,7 +368,7 @@ jobs:
|
||||
build_type: ${{ matrix.build_type }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ github.ref == 'refs/heads/main' }}
|
||||
save_perf_report: ${{ github.ref_name == 'main' }}
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
@@ -1007,7 +1007,7 @@ jobs:
|
||||
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
if [ -z "${S3_KEY}" ]; then
|
||||
echo 2>&1 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
|
||||
echo >&2 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
||||
2
.github/workflows/neon_extra_builds.yml
vendored
2
.github/workflows/neon_extra_builds.yml
vendored
@@ -12,7 +12,7 @@ defaults:
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
|
||||
2
.github/workflows/pg_clients.yml
vendored
2
.github/workflows/pg_clients.yml
vendored
@@ -14,7 +14,7 @@ on:
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
|
||||
1452
Cargo.lock
generated
1452
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
35
Cargo.toml
35
Cargo.toml
@@ -24,10 +24,10 @@ atty = "0.2.14"
|
||||
aws-config = { version = "0.51.0", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "0.21.0"
|
||||
aws-smithy-http = "0.51.0"
|
||||
aws-types = "0.51.0"
|
||||
aws-types = "0.55"
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.61"
|
||||
bindgen = "0.65"
|
||||
bstr = "1.0"
|
||||
byteorder = "1.4"
|
||||
bytes = "1.0"
|
||||
@@ -50,12 +50,12 @@ git-version = "0.3"
|
||||
hashbrown = "0.13"
|
||||
hashlink = "0.8.1"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.3"
|
||||
hex-literal = "0.4"
|
||||
hmac = "0.12.1"
|
||||
hostname = "0.3.1"
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1.1"
|
||||
hyper = "0.14"
|
||||
hyper = { version = "0.14", features = ["http2", "tcp", "runtime", "http1"]}
|
||||
hyper-tungstenite = "0.9"
|
||||
itertools = "0.10"
|
||||
jsonwebtoken = "8"
|
||||
@@ -80,18 +80,18 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
|
||||
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
|
||||
reqwest-middleware = "0.2.0"
|
||||
routerify = "3"
|
||||
rpds = "0.12.0"
|
||||
rpds = "0.13"
|
||||
rustls = "0.20"
|
||||
rustls-pemfile = "1"
|
||||
rustls-split = "0.3"
|
||||
scopeguard = "1.1"
|
||||
sentry = { version = "0.29", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
|
||||
sentry = { version = "0.30", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde_with = "2.0"
|
||||
sha2 = "0.10.2"
|
||||
signal-hook = "0.3"
|
||||
socket2 = "0.4.4"
|
||||
socket2 = "0.5"
|
||||
strum = "0.24"
|
||||
strum_macros = "0.24"
|
||||
svg_fmt = "0.4.1"
|
||||
@@ -106,27 +106,28 @@ tokio-postgres-rustls = "0.9.0"
|
||||
tokio-rustls = "0.23"
|
||||
tokio-stream = "0.1"
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
toml = "0.5"
|
||||
toml_edit = { version = "0.17", features = ["easy"] }
|
||||
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
|
||||
toml = "0.7"
|
||||
toml_edit = "0.19"
|
||||
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
|
||||
tracing = "0.1"
|
||||
tracing-opentelemetry = "0.18.0"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
url = "2.2"
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
webpki-roots = "0.22.5"
|
||||
x509-parser = "0.14"
|
||||
webpki-roots = "0.23"
|
||||
x509-parser = "0.15"
|
||||
percent-encoding = "1.0"
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e", features = ["with-chrono-0_4", "array-impls"] }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e", features = ["array-impls"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e", features = ["array-impls"] }
|
||||
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
|
||||
|
||||
## Other git libraries
|
||||
@@ -154,9 +155,9 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
## Build dependencies
|
||||
criterion = "0.4"
|
||||
rcgen = "0.10"
|
||||
rstest = "0.16"
|
||||
rstest = "0.17"
|
||||
tempfile = "3.4"
|
||||
tonic-build = "0.8"
|
||||
tonic-build = "0.9"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
|
||||
@@ -12,7 +12,7 @@ FROM debian:bullseye-slim AS build-deps
|
||||
RUN apt update && \
|
||||
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
|
||||
libicu-dev libxslt1-dev
|
||||
libicu-dev libxslt1-dev liblz4-dev libzstd-dev
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -24,8 +24,13 @@ FROM build-deps AS pg-build
|
||||
ARG PG_VERSION
|
||||
COPY vendor/postgres-${PG_VERSION} postgres
|
||||
RUN cd postgres && \
|
||||
./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp --with-icu \
|
||||
--with-libxml --with-libxslt && \
|
||||
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
|
||||
--with-icu --with-libxml --with-libxslt --with-lz4" && \
|
||||
if [ "${PG_VERSION}" != "v14" ]; then \
|
||||
# zstd is available only from PG15
|
||||
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
|
||||
fi && \
|
||||
eval $CONFIGURE_CMD && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
|
||||
# Install headers
|
||||
@@ -60,6 +65,7 @@ RUN apt update && \
|
||||
|
||||
# SFCGAL > 1.3 requires CGAL > 5.2, Bullseye's libcgal-dev is 5.2
|
||||
RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar.gz -O SFCGAL.tar.gz && \
|
||||
echo "4e39b3b2adada6254a7bdba6d297bb28e1a9835a9f879b74f37e2dab70203232 SFCGAL.tar.gz" | sha256sum --check && \
|
||||
mkdir sfcgal-src && cd sfcgal-src && tar xvzf ../SFCGAL.tar.gz --strip-components=1 -C . && \
|
||||
cmake . && make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
@@ -68,6 +74,7 @@ RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar
|
||||
ENV PATH "/usr/local/pgsql/bin:$PATH"
|
||||
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postgis.tar.gz && \
|
||||
echo "9a2a219da005a1730a39d1959a1c7cec619b1efb009b65be80ffc25bad299068 postgis.tar.gz" | sha256sum --check && \
|
||||
mkdir postgis-src && cd postgis-src && tar xvzf ../postgis.tar.gz --strip-components=1 -C . && \
|
||||
./autogen.sh && \
|
||||
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
|
||||
@@ -84,6 +91,7 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postg
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer_data_us.control
|
||||
|
||||
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
|
||||
echo "cac297c07d34460887c4f3b522b35c470138760fe358e351ad1db4edb6ee306e pgrouting.tar.gz" | sha256sum --check && \
|
||||
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
@@ -104,6 +112,7 @@ RUN apt update && \
|
||||
apt install -y ninja-build python3-dev libncurses5 binutils clang
|
||||
|
||||
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.5.tar.gz -O plv8.tar.gz && \
|
||||
echo "1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 plv8.tar.gz" | sha256sum --check && \
|
||||
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
@@ -125,11 +134,13 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
# packaged cmake is too old
|
||||
RUN wget https://github.com/Kitware/CMake/releases/download/v3.24.2/cmake-3.24.2-linux-x86_64.sh \
|
||||
-q -O /tmp/cmake-install.sh \
|
||||
&& echo "739d372726cb23129d57a539ce1432453448816e345e1545f6127296926b6754 /tmp/cmake-install.sh" | sha256sum --check \
|
||||
&& chmod u+x /tmp/cmake-install.sh \
|
||||
&& /tmp/cmake-install.sh --skip-license --prefix=/usr/local/ \
|
||||
&& rm /tmp/cmake-install.sh
|
||||
|
||||
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz && \
|
||||
echo "ec99f1f5974846bde64f4513cf8d2ea1b8d172d2218ab41803bf6a63532272bc h3.tar.gz" | sha256sum --check && \
|
||||
mkdir h3-src && cd h3-src && tar xvzf ../h3.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && cd build && \
|
||||
cmake .. -DCMAKE_BUILD_TYPE=Release && \
|
||||
@@ -139,6 +150,7 @@ RUN wget https://github.com/uber/h3/archive/refs/tags/v4.1.0.tar.gz -O h3.tar.gz
|
||||
rm -rf build
|
||||
|
||||
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.1.2.tar.gz -O h3-pg.tar.gz && \
|
||||
echo "c135aa45999b2ad1326d2537c1cadef96d52660838e4ca371706c08fdea1a956 h3-pg.tar.gz" | sha256sum --check && \
|
||||
mkdir h3-pg-src && cd h3-pg-src && tar xvzf ../h3-pg.tar.gz --strip-components=1 -C . && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
@@ -156,6 +168,7 @@ FROM build-deps AS unit-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -O postgresql-unit.tar.gz && \
|
||||
echo "411d05beeb97e5a4abf17572bfcfbb5a68d98d1018918feff995f6ee3bb03e79 postgresql-unit.tar.gz" | sha256sum --check && \
|
||||
mkdir postgresql-unit-src && cd postgresql-unit-src && tar xvzf ../postgresql-unit.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -176,6 +189,7 @@ FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.0.tar.gz -O pgvector.tar.gz && \
|
||||
echo "b76cf84ddad452cc880a6c8c661d137ddd8679c000a16332f4f03ecf6e10bcc8 pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -192,6 +206,7 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
# 9742dab1b2f297ad3811120db7b21451bca2d3c9 made on 13/11/2021
|
||||
RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b21451bca2d3c9.tar.gz -O pgjwt.tar.gz && \
|
||||
echo "cfdefb15007286f67d3d45510f04a6a7a495004be5b3aecb12cda667e774203f pgjwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pgjwt-src && cd pgjwt-src && tar xvzf ../pgjwt.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
|
||||
@@ -206,6 +221,7 @@ FROM build-deps AS hypopg-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypopg.tar.gz && \
|
||||
echo "e7f01ee0259dc1713f318a108f987663d60f3041948c2ada57a94b469565ca8e hypopg.tar.gz" | sha256sum --check && \
|
||||
mkdir hypopg-src && cd hypopg-src && tar xvzf ../hypopg.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -221,6 +237,7 @@ FROM build-deps AS pg-hashids-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
|
||||
echo "74576b992d9277c92196dd8d816baa2cc2d8046fe102f3dcd7f3c3febed6822a pg_hashids.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
@@ -236,6 +253,7 @@ FROM build-deps AS rum-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/postgrespro/rum/archive/refs/tags/1.3.13.tar.gz -O rum.tar.gz && \
|
||||
echo "6ab370532c965568df6210bd844ac6ba649f53055e48243525b0b7e5c4d69a7d rum.tar.gz" | sha256sum --check && \
|
||||
mkdir rum-src && cd rum-src && tar xvzf ../rum.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
@@ -251,11 +269,28 @@ FROM build-deps AS pgtap-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/theory/pgtap/archive/refs/tags/v1.2.0.tar.gz -O pgtap.tar.gz && \
|
||||
echo "9c7c3de67ea41638e14f06da5da57bac6f5bd03fea05c165a0ec862205a5c052 pgtap.tar.gz" | sha256sum --check && \
|
||||
mkdir pgtap-src && cd pgtap-src && tar xvzf ../pgtap.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgtap.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "ip4r-pg-build"
|
||||
# compile ip4r extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS ip4r-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/RhodiumToad/ip4r/archive/refs/tags/2.4.1.tar.gz -O ip4r.tar.gz && \
|
||||
echo "78b9f0c1ae45c22182768fe892a32d533c82281035e10914111400bf6301c726 ip4r.tar.gz" | sha256sum --check && \
|
||||
mkdir ip4r-src && cd ip4r-src && tar xvzf ../ip4r.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/ip4r.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "prefix-pg-build"
|
||||
@@ -266,6 +301,7 @@ FROM build-deps AS prefix-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/dimitri/prefix/archive/refs/tags/v1.2.9.tar.gz -O prefix.tar.gz && \
|
||||
echo "38d30a08d0241a8bbb8e1eb8f0152b385051665a8e621c8899e7c5068f8b511e prefix.tar.gz" | sha256sum --check && \
|
||||
mkdir prefix-src && cd prefix-src && tar xvzf ../prefix.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -281,6 +317,7 @@ FROM build-deps AS hll-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.17.tar.gz -O hll.tar.gz && \
|
||||
echo "9a18288e884f197196b0d29b9f178ba595b0dfc21fbf7a8699380e77fa04c1e9 hll.tar.gz" | sha256sum --check && \
|
||||
mkdir hll-src && cd hll-src && tar xvzf ../hll.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -296,6 +333,7 @@ FROM build-deps AS plpgsql-check-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.3.2.tar.gz -O plpgsql_check.tar.gz && \
|
||||
echo "9d81167c4bbeb74eebf7d60147b21961506161addc2aee537f95ad8efeae427b plpgsql_check.tar.gz" | sha256sum --check && \
|
||||
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
@@ -315,6 +353,7 @@ ENV PATH "/usr/local/pgsql/bin:$PATH"
|
||||
RUN apt-get update && \
|
||||
apt-get install -y cmake && \
|
||||
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.10.1.tar.gz -O timescaledb.tar.gz && \
|
||||
echo "6fca72a6ed0f6d32d2b3523951ede73dc5f9b0077b38450a029a5f411fdb8c73 timescaledb.tar.gz" | sha256sum --check && \
|
||||
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
|
||||
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON && \
|
||||
cd build && \
|
||||
@@ -323,7 +362,39 @@ RUN apt-get update && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/timescaledb.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
#
|
||||
# Layer "pg-hint-plan-pg-build"
|
||||
# compile pg_hint_plan extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-hint-plan-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ARG PG_VERSION
|
||||
ENV PATH "/usr/local/pgsql/bin:$PATH"
|
||||
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14") \
|
||||
export PG_HINT_PLAN_VERSION=14_1_4_1 \
|
||||
export PG_HINT_PLAN_CHECKSUM=c3501becf70ead27f70626bce80ea401ceac6a77e2083ee5f3ff1f1444ec1ad1 \
|
||||
;; \
|
||||
"v15") \
|
||||
export PG_HINT_PLAN_VERSION=15_1_5_0 \
|
||||
export PG_HINT_PLAN_CHECKSUM=564cbbf4820973ffece63fbf76e3c0af62c4ab23543142c7caaa682bc48918be \
|
||||
;; \
|
||||
*) \
|
||||
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/ossc-db/pg_hint_plan/archive/refs/tags/REL${PG_HINT_PLAN_VERSION}.tar.gz -O pg_hint_plan.tar.gz && \
|
||||
echo "${PG_HINT_PLAN_CHECKSUM} pg_hint_plan.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_hint_plan-src && cd pg_hint_plan-src && tar xvzf ../pg_hint_plan.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_hint_plan.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
# This layer is used to build `pgx` deps
|
||||
#
|
||||
@@ -351,7 +422,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
|
||||
USER root
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
#
|
||||
# Layer "pg-jsonschema-pg-build"
|
||||
# Compile "pg_jsonschema" extension
|
||||
#
|
||||
@@ -359,15 +430,17 @@ USER root
|
||||
|
||||
FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
|
||||
# there is no release tag yet, but we need it due to the superuser fix in the control file
|
||||
# caeab60d70b2fd3ae421ec66466a3abbb37b7ee6 made on 06/03/2023
|
||||
# there is no release tag yet, but we need it due to the superuser fix in the control file, switch to git tag after release >= 0.1.5
|
||||
RUN wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421ec66466a3abbb37b7ee6.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "54129ce2e7ee7a585648dbb4cef6d73f795d94fe72f248ac01119992518469a4 pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xvzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgx = "0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
#
|
||||
# Layer "pg-graphql-pg-build"
|
||||
# Compile "pg_graphql" extension
|
||||
#
|
||||
@@ -375,11 +448,13 @@ RUN wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421e
|
||||
|
||||
FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
|
||||
# b4988843647450a153439be367168ed09971af85 made on 22/02/2023 (from remove-pgx-contrib-spiext branch)
|
||||
# Currently pgx version bump to >= 0.7.2 causes "call to unsafe function" compliation errors in
|
||||
# pgx-contrib-spiext. There is a branch that removes that dependency, so use it. It is on the
|
||||
# same 1.1 version we've used before.
|
||||
RUN git clone -b remove-pgx-contrib-spiext --single-branch https://github.com/yrashk/pg_graphql && \
|
||||
cd pg_graphql && \
|
||||
RUN wget https://github.com/yrashk/pg_graphql/archive/b4988843647450a153439be367168ed09971af85.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "0c7b0e746441b2ec24187d0e03555faf935c2159e2839bddd14df6dafbc8c9bd pg_graphql.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_graphql-src && cd pg_graphql-src && tar xvzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgx = "~0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
sed -i 's/pgx-tests = "~0.7.1"/pgx-tests = "0.7.3"/g' Cargo.toml && \
|
||||
cargo pgx install --release && \
|
||||
@@ -396,8 +471,10 @@ RUN git clone -b remove-pgx-contrib-spiext --single-branch https://github.com/yr
|
||||
|
||||
FROM rust-extensions-build AS pg-tiktoken-pg-build
|
||||
|
||||
RUN git clone --depth=1 --single-branch https://github.com/kelvich/pg_tiktoken && \
|
||||
cd pg_tiktoken && \
|
||||
# 801f84f08c6881c8aa30f405fafbf00eec386a72 made on 10/03/2023
|
||||
RUN wget https://github.com/kelvich/pg_tiktoken/archive/801f84f08c6881c8aa30f405fafbf00eec386a72.tar.gz -O pg_tiktoken.tar.gz && \
|
||||
echo "52f60ac800993a49aa8c609961842b611b6b1949717b69ce2ec9117117e16e4a pg_tiktoken.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xvzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
|
||||
cargo pgx install --release && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_tiktoken.control
|
||||
|
||||
@@ -423,10 +500,12 @@ COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rum-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgtap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=ip4r-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=prefix-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=hll-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=plpgsql-check-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-hint-plan-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -491,13 +570,17 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb
|
||||
# Install:
|
||||
# libreadline8 for psql
|
||||
# libicu67, locales for collations (including ICU and plpgsql_check)
|
||||
# liblz4-1 for lz4
|
||||
# libossp-uuid16 for extension ossp-uuid
|
||||
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
|
||||
# libxml2, libxslt1.1 for xml2
|
||||
# libzstd1 for zstd
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends -y \
|
||||
gdb \
|
||||
locales \
|
||||
libicu67 \
|
||||
liblz4-1 \
|
||||
libreadline8 \
|
||||
libossp-uuid16 \
|
||||
libgeos-c1v5 \
|
||||
@@ -507,7 +590,8 @@ RUN apt update && \
|
||||
libsfcgal1 \
|
||||
libxml2 \
|
||||
libxslt1.1 \
|
||||
gdb && \
|
||||
libzstd1 \
|
||||
procps && \
|
||||
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
|
||||
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
|
||||
|
||||
|
||||
22
README.md
22
README.md
@@ -147,15 +147,15 @@ Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50
|
||||
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
|
||||
|
||||
# start postgres compute node
|
||||
> ./target/debug/neon_local pg start main
|
||||
Starting new postgres (v14) main on timeline de200bd42b49cc1814412c7e592dd6e9 ...
|
||||
> ./target/debug/neon_local endpoint start main
|
||||
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
|
||||
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
|
||||
Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
|
||||
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
|
||||
|
||||
# check list of running postgres instances
|
||||
> ./target/debug/neon_local pg list
|
||||
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
|
||||
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
|
||||
> ./target/debug/neon_local endpoint list
|
||||
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
|
||||
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
|
||||
```
|
||||
|
||||
2. Now, it is possible to connect to postgres and run some queries:
|
||||
@@ -184,14 +184,14 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
|
||||
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]
|
||||
|
||||
# start postgres on that branch
|
||||
> ./target/debug/neon_local pg start migration_check --branch-name migration_check
|
||||
Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
|
||||
> ./target/debug/neon_local endpoint start migration_check --branch-name migration_check
|
||||
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
|
||||
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
|
||||
Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
|
||||
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
|
||||
|
||||
# check the new list of running postgres instances
|
||||
> ./target/debug/neon_local pg list
|
||||
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
|
||||
> ./target/debug/neon_local endpoint list
|
||||
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
|
||||
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
|
||||
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ use url::Url;
|
||||
use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
@@ -175,6 +176,8 @@ fn main() -> Result<()> {
|
||||
|
||||
// Launch remaining service threads
|
||||
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
|
||||
@@ -1,12 +1,28 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use postgres::Client;
|
||||
use tokio_postgres::NoTls;
|
||||
use tracing::{error, instrument};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
/// Update timestamp in a row in a special service table to check
|
||||
/// that we can actually write some data in this particular timeline.
|
||||
/// Create table if it's missing.
|
||||
#[instrument(skip_all)]
|
||||
pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
||||
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
// Connect to the database.
|
||||
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
||||
if client.is_closed() {
|
||||
return Err(anyhow!("connection to postgres closed"));
|
||||
}
|
||||
|
||||
// The connection object performs the actual communication with the database,
|
||||
// so spawn it off to run on its own.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let query = "
|
||||
CREATE TABLE IF NOT EXISTS health_check (
|
||||
id serial primary key,
|
||||
@@ -15,31 +31,15 @@ pub fn create_writability_check_data(client: &mut Client) -> Result<()> {
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = now();";
|
||||
let result = client.simple_query(query)?;
|
||||
if result.len() < 2 {
|
||||
return Err(anyhow::format_err!("executed {} queries", result.len()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
||||
if client.is_closed() {
|
||||
return Err(anyhow!("connection to postgres closed"));
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let result = client
|
||||
.simple_query("UPDATE health_check SET updated_at = now() WHERE id = 1;")
|
||||
.await?;
|
||||
|
||||
if result.len() != 1 {
|
||||
return Err(anyhow!("statement can't be executed"));
|
||||
|
||||
let result = client.simple_query(query).await?;
|
||||
|
||||
if result.len() != 2 {
|
||||
return Err(anyhow::format_err!(
|
||||
"expected 2 query results, but got {}",
|
||||
result.len()
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -32,7 +32,6 @@ use utils::lsn::Lsn;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::ComputeSpec;
|
||||
|
||||
use crate::checker::create_writability_check_data;
|
||||
use crate::config;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
@@ -342,7 +341,6 @@ impl ComputeNode {
|
||||
handle_databases(spec, &mut client)?;
|
||||
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, self.connstr.as_str(), &mut client)?;
|
||||
create_writability_check_data(&mut client)?;
|
||||
handle_extensions(spec, &mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
@@ -356,6 +354,48 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
|
||||
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
|
||||
// have opened connection to Postgres and superuser access.
|
||||
#[instrument(skip(self, client))]
|
||||
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
|
||||
client.simple_query("SELECT pg_reload_conf()")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Similar to `apply_config()`, but does a bit different sequence of operations,
|
||||
/// as it's used to reconfigure a previously started and configured Postgres node.
|
||||
#[instrument(skip(self))]
|
||||
pub fn reconfigure(&self) -> Result<()> {
|
||||
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
self.pg_reload_conf(&mut client)?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
|
||||
info!(
|
||||
"finished reconfiguration of compute node for operation {}",
|
||||
op_id
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
|
||||
54
compute_tools/src/configurator.rs
Normal file
54
compute_tools/src/configurator.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use anyhow::Result;
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
use compute_api::responses::ComputeStatus;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
|
||||
#[instrument(skip(compute))]
|
||||
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
info!("waiting for reconfiguration requests");
|
||||
loop {
|
||||
let state = compute.state.lock().unwrap();
|
||||
let mut state = compute.state_changed.wait(state).unwrap();
|
||||
|
||||
if state.status == ComputeStatus::ConfigurationPending {
|
||||
info!("got configuration request");
|
||||
state.status = ComputeStatus::Configuration;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
|
||||
let mut new_status = ComputeStatus::Failed;
|
||||
if let Err(e) = compute.reconfigure() {
|
||||
error!("could not configure compute node: {}", e);
|
||||
} else {
|
||||
new_status = ComputeStatus::Running;
|
||||
info!("compute node configured");
|
||||
}
|
||||
|
||||
// XXX: used to test that API is blocking
|
||||
// std::thread::sleep(std::time::Duration::from_millis(10000));
|
||||
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::Failed {
|
||||
info!("compute node is now in Failed state, exiting");
|
||||
break;
|
||||
} else {
|
||||
info!("woken up for compute status: {:?}, sleeping", state.status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||
let compute = Arc::clone(compute);
|
||||
|
||||
Ok(thread::Builder::new()
|
||||
.name("compute-configurator".into())
|
||||
.spawn(move || {
|
||||
configurator_main_loop(&compute);
|
||||
info!("configurator thread is exited");
|
||||
})?)
|
||||
}
|
||||
@@ -85,7 +85,10 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
let res = crate::checker::check_writability(compute).await;
|
||||
match res {
|
||||
Ok(_) => Response::new(Body::from("true")),
|
||||
Err(e) => Response::new(Body::from(e.to_string())),
|
||||
Err(e) => {
|
||||
error!("check_writability failed: {}", e);
|
||||
Response::new(Body::from(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,7 +158,7 @@ async fn handle_configure_request(
|
||||
// ```
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status != ComputeStatus::Empty {
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for configuration request: {:?}",
|
||||
state.status.clone()
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
//!
|
||||
pub mod checker;
|
||||
pub mod config;
|
||||
pub mod configurator;
|
||||
pub mod http;
|
||||
#[macro_use]
|
||||
pub mod logger;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use postgres::config::Config;
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
|
||||
@@ -10,6 +10,7 @@ use crate::config;
|
||||
use crate::params::PG_HBA_ALL_MD5;
|
||||
use crate::pg_helpers::*;
|
||||
|
||||
use compute_api::responses::ControlPlaneSpecResponse;
|
||||
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
|
||||
|
||||
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
|
||||
@@ -26,13 +27,19 @@ pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<C
|
||||
// - network error, then retry
|
||||
// - no spec for compute yet, then wait
|
||||
// - compute id is unknown or any other error, then bail out
|
||||
let spec = reqwest::blocking::Client::new()
|
||||
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
|
||||
.get(cp_uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()?
|
||||
.json()?;
|
||||
.send()
|
||||
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
|
||||
.json()
|
||||
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
|
||||
|
||||
Ok(spec)
|
||||
if let Some(spec) = resp.spec {
|
||||
Ok(spec)
|
||||
} else {
|
||||
bail!("could not get compute spec from control plane")
|
||||
}
|
||||
}
|
||||
|
||||
/// It takes cluster specification and does the following:
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//!
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
@@ -106,8 +106,9 @@ fn main() -> Result<()> {
|
||||
"start" => handle_start_all(sub_args, &env),
|
||||
"stop" => handle_stop_all(sub_args, &env),
|
||||
"pageserver" => handle_pageserver(sub_args, &env),
|
||||
"pg" => handle_pg(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
"endpoint" => handle_endpoint(sub_args, &env),
|
||||
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
|
||||
_ => bail!("unexpected subcommand {sub_name}"),
|
||||
};
|
||||
|
||||
@@ -470,10 +471,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
println!("Importing timeline into pageserver ...");
|
||||
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
|
||||
println!("Creating node for imported timeline ...");
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
|
||||
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
|
||||
println!("Creating endpoint for imported timeline ...");
|
||||
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
|
||||
println!("Done");
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
@@ -521,10 +522,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match pg_match.subcommand() {
|
||||
Some(pg_subcommand_data) => pg_subcommand_data,
|
||||
None => bail!("no pg subcommand provided"),
|
||||
fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match ep_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
None => bail!("no endpoint subcommand provided"),
|
||||
};
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
@@ -546,7 +547,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
table.load_preset(comfy_table::presets::NOTHING);
|
||||
|
||||
table.set_header([
|
||||
"NODE",
|
||||
"ENDPOINT",
|
||||
"ADDRESS",
|
||||
"TIMELINE",
|
||||
"BRANCH NAME",
|
||||
@@ -554,39 +555,39 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
"STATUS",
|
||||
]);
|
||||
|
||||
for ((_, node_name), node) in cplane
|
||||
.nodes
|
||||
for (endpoint_id, endpoint) in cplane
|
||||
.endpoints
|
||||
.iter()
|
||||
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
|
||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
||||
{
|
||||
let lsn_str = match node.lsn {
|
||||
let lsn_str = match endpoint.lsn {
|
||||
None => {
|
||||
// -> primary node
|
||||
// -> primary endpoint
|
||||
// Use the LSN at the end of the timeline.
|
||||
timeline_infos
|
||||
.get(&node.timeline_id)
|
||||
.get(&endpoint.timeline_id)
|
||||
.map(|bi| bi.last_record_lsn.to_string())
|
||||
.unwrap_or_else(|| "?".to_string())
|
||||
}
|
||||
Some(lsn) => {
|
||||
// -> read-only node
|
||||
// Use the node's LSN.
|
||||
// -> read-only endpoint
|
||||
// Use the endpoint's LSN.
|
||||
lsn.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
let branch_name = timeline_name_mappings
|
||||
.get(&TenantTimelineId::new(tenant_id, node.timeline_id))
|
||||
.get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
|
||||
.map(|name| name.as_str())
|
||||
.unwrap_or("?");
|
||||
|
||||
table.add_row([
|
||||
node_name.as_str(),
|
||||
&node.address.to_string(),
|
||||
&node.timeline_id.to_string(),
|
||||
endpoint_id.as_str(),
|
||||
&endpoint.address.to_string(),
|
||||
&endpoint.timeline_id.to_string(),
|
||||
branch_name,
|
||||
lsn_str.as_str(),
|
||||
node.status(),
|
||||
endpoint.status(),
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -597,10 +598,10 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
.get_one::<String>("branch-name")
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or(DEFAULT_BRANCH_NAME);
|
||||
let node_name = sub_args
|
||||
.get_one::<String>("node")
|
||||
.map(|node_name| node_name.to_string())
|
||||
.unwrap_or_else(|| format!("{branch_name}_node"));
|
||||
let endpoint_id = sub_args
|
||||
.get_one::<String>("endpoint_id")
|
||||
.map(String::to_string)
|
||||
.unwrap_or_else(|| format!("ep-{branch_name}"));
|
||||
|
||||
let lsn = sub_args
|
||||
.get_one::<String>("lsn")
|
||||
@@ -618,15 +619,15 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
|
||||
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
|
||||
}
|
||||
"start" => {
|
||||
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
||||
let node_name = sub_args
|
||||
.get_one::<String>("node")
|
||||
.ok_or_else(|| anyhow!("No node name was provided to start"))?;
|
||||
let endpoint_id = sub_args
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
|
||||
let node = cplane.nodes.get(&(tenant_id, node_name.to_string()));
|
||||
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
|
||||
|
||||
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
|
||||
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
||||
@@ -636,9 +637,9 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(node) = node {
|
||||
println!("Starting existing postgres {node_name}...");
|
||||
node.start(&auth_token)?;
|
||||
if let Some(endpoint) = endpoint {
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token)?;
|
||||
} else {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
@@ -663,27 +664,33 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
// start --port X
|
||||
// stop
|
||||
// start <-- will also use port X even without explicit port argument
|
||||
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
|
||||
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
|
||||
|
||||
let node =
|
||||
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
|
||||
node.start(&auth_token)?;
|
||||
let ep = cplane.new_endpoint(
|
||||
tenant_id,
|
||||
endpoint_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
port,
|
||||
pg_version,
|
||||
)?;
|
||||
ep.start(&auth_token)?;
|
||||
}
|
||||
}
|
||||
"stop" => {
|
||||
let node_name = sub_args
|
||||
.get_one::<String>("node")
|
||||
.ok_or_else(|| anyhow!("No node name was provided to stop"))?;
|
||||
let endpoint_id = sub_args
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
|
||||
let destroy = sub_args.get_flag("destroy");
|
||||
|
||||
let node = cplane
|
||||
.nodes
|
||||
.get(&(tenant_id, node_name.to_string()))
|
||||
.with_context(|| format!("postgres {node_name} is not found"))?;
|
||||
node.stop(destroy)?;
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
.get(endpoint_id.as_str())
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
endpoint.stop(destroy)?;
|
||||
}
|
||||
|
||||
_ => bail!("Unexpected pg subcommand '{sub_name}'"),
|
||||
_ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -802,7 +809,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
}
|
||||
|
||||
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
// Postgres nodes are not started automatically
|
||||
// Endpoints are not started automatically
|
||||
|
||||
broker::start_broker_process(env)?;
|
||||
|
||||
@@ -836,10 +843,10 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
|
||||
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
|
||||
// Stop all compute nodes
|
||||
// Stop all endpoints
|
||||
match ComputeControlPlane::load(env.clone()) {
|
||||
Ok(cplane) => {
|
||||
for (_k, node) in cplane.nodes {
|
||||
for (_k, node) in cplane.endpoints {
|
||||
if let Err(e) = node.stop(false) {
|
||||
eprintln!("postgres stop failed: {e:#}");
|
||||
}
|
||||
@@ -872,7 +879,9 @@ fn cli() -> Command {
|
||||
.help("Name of the branch to be created or used as an alias for other services")
|
||||
.required(false);
|
||||
|
||||
let pg_node_arg = Arg::new("node").help("Postgres node name").required(false);
|
||||
let endpoint_id_arg = Arg::new("endpoint_id")
|
||||
.help("Postgres endpoint id")
|
||||
.required(false);
|
||||
|
||||
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
|
||||
|
||||
@@ -1026,27 +1035,27 @@ fn cli() -> Command {
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("pg")
|
||||
Command::new("endpoint")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage postgres instances")
|
||||
.subcommand(Command::new("list").arg(tenant_id_arg.clone()))
|
||||
.subcommand(Command::new("create")
|
||||
.about("Create a postgres compute node")
|
||||
.arg(pg_node_arg.clone())
|
||||
.about("Create a compute endpoint")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(lsn_arg.clone())
|
||||
.arg(port_arg.clone())
|
||||
.arg(
|
||||
Arg::new("config-only")
|
||||
.help("Don't do basebackup, create compute node with only config files")
|
||||
.help("Don't do basebackup, create endpoint directory with only config files")
|
||||
.long("config-only")
|
||||
.required(false))
|
||||
.arg(pg_version_arg.clone())
|
||||
)
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
|
||||
.arg(pg_node_arg.clone())
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg)
|
||||
.arg(timeline_id_arg)
|
||||
@@ -1056,7 +1065,7 @@ fn cli() -> Command {
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
.arg(pg_node_arg)
|
||||
.arg(endpoint_id_arg)
|
||||
.arg(tenant_id_arg)
|
||||
.arg(
|
||||
Arg::new("destroy")
|
||||
@@ -1068,6 +1077,13 @@ fn cli() -> Command {
|
||||
)
|
||||
|
||||
)
|
||||
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
|
||||
.subcommand(
|
||||
Command::new("pg")
|
||||
.hide(true)
|
||||
.arg(Arg::new("ignore-rest").allow_hyphen_values(true).num_args(0..).required(false))
|
||||
.trailing_var_arg(true)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("start")
|
||||
.about("Start page server and safekeepers")
|
||||
|
||||
@@ -25,54 +25,45 @@ use crate::postgresql_conf::PostgresConf;
|
||||
//
|
||||
pub struct ComputeControlPlane {
|
||||
base_port: u16,
|
||||
pageserver: Arc<PageServerNode>,
|
||||
pub nodes: BTreeMap<(TenantId, String), Arc<PostgresNode>>,
|
||||
|
||||
// endpoint ID is the key
|
||||
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
|
||||
|
||||
env: LocalEnv,
|
||||
pageserver: Arc<PageServerNode>,
|
||||
}
|
||||
|
||||
impl ComputeControlPlane {
|
||||
// Load current nodes with ports from data directories on disk
|
||||
// Directory structure has the following layout:
|
||||
// pgdatadirs
|
||||
// |- tenants
|
||||
// | |- <tenant_id>
|
||||
// | | |- <node name>
|
||||
// Load current endpoints from the endpoints/ subdirectories
|
||||
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
|
||||
let pageserver = Arc::new(PageServerNode::from_env(&env));
|
||||
|
||||
let mut nodes = BTreeMap::default();
|
||||
let pgdatadirspath = &env.pg_data_dirs_path();
|
||||
|
||||
for tenant_dir in fs::read_dir(pgdatadirspath)
|
||||
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
|
||||
let mut endpoints = BTreeMap::default();
|
||||
for endpoint_dir in fs::read_dir(env.endpoints_path())
|
||||
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
|
||||
{
|
||||
let tenant_dir = tenant_dir?;
|
||||
for timeline_dir in fs::read_dir(tenant_dir.path())
|
||||
.with_context(|| format!("failed to list {}", tenant_dir.path().display()))?
|
||||
{
|
||||
let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?;
|
||||
nodes.insert((node.tenant_id, node.name.clone()), Arc::new(node));
|
||||
}
|
||||
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
|
||||
endpoints.insert(ep.name.clone(), Arc::new(ep));
|
||||
}
|
||||
|
||||
Ok(ComputeControlPlane {
|
||||
base_port: 55431,
|
||||
pageserver,
|
||||
nodes,
|
||||
endpoints,
|
||||
env,
|
||||
pageserver,
|
||||
})
|
||||
}
|
||||
|
||||
fn get_port(&mut self) -> u16 {
|
||||
1 + self
|
||||
.nodes
|
||||
.endpoints
|
||||
.values()
|
||||
.map(|node| node.address.port())
|
||||
.map(|ep| ep.address.port())
|
||||
.max()
|
||||
.unwrap_or(self.base_port)
|
||||
}
|
||||
|
||||
pub fn new_node(
|
||||
pub fn new_endpoint(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
name: &str,
|
||||
@@ -80,9 +71,9 @@ impl ComputeControlPlane {
|
||||
lsn: Option<Lsn>,
|
||||
port: Option<u16>,
|
||||
pg_version: u32,
|
||||
) -> Result<Arc<PostgresNode>> {
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let port = port.unwrap_or_else(|| self.get_port());
|
||||
let node = Arc::new(PostgresNode {
|
||||
let ep = Arc::new(Endpoint {
|
||||
name: name.to_owned(),
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||
env: self.env.clone(),
|
||||
@@ -93,39 +84,45 @@ impl ComputeControlPlane {
|
||||
pg_version,
|
||||
});
|
||||
|
||||
node.create_pgdata()?;
|
||||
node.setup_pg_conf()?;
|
||||
ep.create_pgdata()?;
|
||||
ep.setup_pg_conf()?;
|
||||
|
||||
self.nodes
|
||||
.insert((tenant_id, node.name.clone()), Arc::clone(&node));
|
||||
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
|
||||
|
||||
Ok(node)
|
||||
Ok(ep)
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PostgresNode {
|
||||
pub address: SocketAddr,
|
||||
pub struct Endpoint {
|
||||
/// used as the directory name
|
||||
name: String,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
|
||||
pub lsn: Option<Lsn>,
|
||||
|
||||
// port and address of the Postgres server
|
||||
pub address: SocketAddr,
|
||||
pg_version: u32,
|
||||
|
||||
// These are not part of the endpoint as such, but the environment
|
||||
// the endpoint runs in.
|
||||
pub env: LocalEnv,
|
||||
pageserver: Arc<PageServerNode>,
|
||||
pub timeline_id: TimelineId,
|
||||
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
|
||||
pub tenant_id: TenantId,
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
impl PostgresNode {
|
||||
impl Endpoint {
|
||||
fn from_dir_entry(
|
||||
entry: std::fs::DirEntry,
|
||||
env: &LocalEnv,
|
||||
pageserver: &Arc<PageServerNode>,
|
||||
) -> Result<PostgresNode> {
|
||||
) -> Result<Endpoint> {
|
||||
if !entry.file_type()?.is_dir() {
|
||||
anyhow::bail!(
|
||||
"PostgresNode::from_dir_entry failed: '{}' is not a directory",
|
||||
"Endpoint::from_dir_entry failed: '{}' is not a directory",
|
||||
entry.path().display()
|
||||
);
|
||||
}
|
||||
@@ -135,7 +132,7 @@ impl PostgresNode {
|
||||
let name = fname.to_str().unwrap().to_string();
|
||||
|
||||
// Read config file into memory
|
||||
let cfg_path = entry.path().join("postgresql.conf");
|
||||
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
|
||||
let cfg_path_str = cfg_path.to_string_lossy();
|
||||
let mut conf_file = File::open(&cfg_path)
|
||||
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
|
||||
@@ -161,7 +158,7 @@ impl PostgresNode {
|
||||
conf.parse_field_optional("recovery_target_lsn", &context)?;
|
||||
|
||||
// ok now
|
||||
Ok(PostgresNode {
|
||||
Ok(Endpoint {
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||
name,
|
||||
env: env.clone(),
|
||||
@@ -269,7 +266,7 @@ impl PostgresNode {
|
||||
}
|
||||
|
||||
// Write postgresql.conf with default configuration
|
||||
// and PG_VERSION file to the data directory of a new node.
|
||||
// and PG_VERSION file to the data directory of a new endpoint.
|
||||
fn setup_pg_conf(&self) -> Result<()> {
|
||||
let mut conf = PostgresConf::new();
|
||||
conf.append("max_wal_senders", "10");
|
||||
@@ -289,7 +286,7 @@ impl PostgresNode {
|
||||
// walproposer panics when basebackup is invalid, it is pointless to restart in this case.
|
||||
conf.append("restart_after_crash", "off");
|
||||
|
||||
// Configure the node to fetch pages from pageserver
|
||||
// Configure the Neon Postgres extension to fetch pages from pageserver
|
||||
let pageserver_connstr = {
|
||||
let config = &self.pageserver.pg_connection_config;
|
||||
let (host, port) = (config.host(), config.port());
|
||||
@@ -325,7 +322,7 @@ impl PostgresNode {
|
||||
conf.append("max_replication_flush_lag", "10GB");
|
||||
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// Configure the node to connect to the safekeepers
|
||||
// Configure Postgres to connect to the safekeepers
|
||||
conf.append("synchronous_standby_names", "walproposer");
|
||||
|
||||
let safekeepers = self
|
||||
@@ -380,8 +377,12 @@ impl PostgresNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn endpoint_path(&self) -> PathBuf {
|
||||
self.env.endpoints_path().join(&self.name)
|
||||
}
|
||||
|
||||
pub fn pgdata(&self) -> PathBuf {
|
||||
self.env.pg_data_dir(&self.tenant_id, &self.name)
|
||||
self.endpoint_path().join("pgdata")
|
||||
}
|
||||
|
||||
pub fn status(&self) -> &str {
|
||||
@@ -443,12 +444,11 @@ impl PostgresNode {
|
||||
}
|
||||
|
||||
pub fn start(&self, auth_token: &Option<String>) -> Result<()> {
|
||||
// Bail if the node already running.
|
||||
if self.status() == "running" {
|
||||
anyhow::bail!("The node is already running");
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
}
|
||||
|
||||
// 1. We always start compute node from scratch, so
|
||||
// 1. We always start Postgres from scratch, so
|
||||
// if old dir exists, preserve 'postgresql.conf' and drop the directory
|
||||
let postgresql_conf_path = self.pgdata().join("postgresql.conf");
|
||||
let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| {
|
||||
@@ -470,8 +470,8 @@ impl PostgresNode {
|
||||
File::create(self.pgdata().join("standby.signal"))?;
|
||||
}
|
||||
|
||||
// 4. Finally start the compute node postgres
|
||||
println!("Starting postgres node at '{}'", self.connstr());
|
||||
// 4. Finally start postgres
|
||||
println!("Starting postgres at '{}'", self.connstr());
|
||||
self.pg_ctl(&["start"], auth_token)
|
||||
}
|
||||
|
||||
@@ -480,7 +480,7 @@ impl PostgresNode {
|
||||
// use immediate shutdown mode, otherwise,
|
||||
// shutdown gracefully to leave the data directory sane.
|
||||
//
|
||||
// Compute node always starts from scratch, so stop
|
||||
// Postgres is always started from scratch, so stop
|
||||
// without destroy only used for testing and debugging.
|
||||
//
|
||||
if destroy {
|
||||
@@ -489,7 +489,7 @@ impl PostgresNode {
|
||||
"Destroying postgres data directory '{}'",
|
||||
self.pgdata().to_str().unwrap()
|
||||
);
|
||||
fs::remove_dir_all(self.pgdata())?;
|
||||
fs::remove_dir_all(self.endpoint_path())?;
|
||||
} else {
|
||||
self.pg_ctl(&["stop"], &None)?;
|
||||
}
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
mod background_process;
|
||||
pub mod broker;
|
||||
pub mod compute;
|
||||
pub mod endpoint;
|
||||
pub mod local_env;
|
||||
pub mod pageserver;
|
||||
pub mod postgresql_conf;
|
||||
|
||||
@@ -200,14 +200,8 @@ impl LocalEnv {
|
||||
self.neon_distrib_dir.join("storage_broker")
|
||||
}
|
||||
|
||||
pub fn pg_data_dirs_path(&self) -> PathBuf {
|
||||
self.base_data_dir.join("pgdatadirs").join("tenants")
|
||||
}
|
||||
|
||||
pub fn pg_data_dir(&self, tenant_id: &TenantId, branch_name: &str) -> PathBuf {
|
||||
self.pg_data_dirs_path()
|
||||
.join(tenant_id.to_string())
|
||||
.join(branch_name)
|
||||
pub fn endpoints_path(&self) -> PathBuf {
|
||||
self.base_data_dir.join("endpoints")
|
||||
}
|
||||
|
||||
// TODO: move pageserver files into ./pageserver
|
||||
@@ -427,7 +421,7 @@ impl LocalEnv {
|
||||
}
|
||||
}
|
||||
|
||||
fs::create_dir_all(self.pg_data_dirs_path())?;
|
||||
fs::create_dir_all(self.endpoints_path())?;
|
||||
|
||||
for safekeeper in &self.safekeepers {
|
||||
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
|
||||
|
||||
@@ -368,6 +368,9 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -445,6 +448,9 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as an integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.get("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
})
|
||||
.send()?
|
||||
.error_from_body()?;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Serialize, Serializer};
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
|
||||
use crate::spec::ComputeSpec;
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
pub struct GenericAPIError {
|
||||
@@ -43,6 +45,8 @@ pub enum ComputeStatus {
|
||||
Init,
|
||||
// Compute is configured and running.
|
||||
Running,
|
||||
// New spec is being applied.
|
||||
Configuration,
|
||||
// Either startup or configuration failed,
|
||||
// compute will exit soon or is waiting for
|
||||
// control-plane to terminate it.
|
||||
@@ -64,3 +68,11 @@ pub struct ComputeMetrics {
|
||||
pub config_ms: u64,
|
||||
pub total_startup_ms: u64,
|
||||
}
|
||||
|
||||
/// Response of the `/computes/{compute_id}/spec` control-plane API.
|
||||
/// This is not actually a compute API response, so consider moving
|
||||
/// to a different place.
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct ControlPlaneSpecResponse {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
}
|
||||
|
||||
@@ -4,13 +4,12 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.68"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
|
||||
rand = "0.8.3"
|
||||
serde = "1.0.152"
|
||||
serde_with = "2.1.0"
|
||||
utils = { version = "0.1.0", path = "../utils" }
|
||||
workspace_hack = { version = "0.1.0", path = "../../workspace_hack" }
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -7,6 +7,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
const_format.workspace = true
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
@@ -14,6 +15,7 @@ byteorder.workspace = true
|
||||
utils.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
enum-map.workspace = true
|
||||
serde_json.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use strum_macros;
|
||||
use utils::{
|
||||
history_buffer::HistoryBufferWithDropCounter,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
@@ -18,11 +19,23 @@ use anyhow::bail;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
|
||||
/// A state of a tenant in pageserver's memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
serde::Serialize,
|
||||
serde::Deserialize,
|
||||
strum_macros::Display,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::EnumVariantNames,
|
||||
strum_macros::AsRefStr,
|
||||
strum_macros::IntoStaticStr,
|
||||
)]
|
||||
#[serde(tag = "slug", content = "data")]
|
||||
pub enum TenantState {
|
||||
// This tenant is being loaded from local disk
|
||||
/// This tenant is being loaded from local disk
|
||||
Loading,
|
||||
// This tenant is being downloaded from cloud storage.
|
||||
/// This tenant is being downloaded from cloud storage.
|
||||
Attaching,
|
||||
/// Tenant is fully operational
|
||||
Active,
|
||||
@@ -31,15 +44,7 @@ pub enum TenantState {
|
||||
Stopping,
|
||||
/// A tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations, because it failed to be activated.
|
||||
Broken,
|
||||
}
|
||||
|
||||
pub mod state {
|
||||
pub const LOADING: &str = "loading";
|
||||
pub const ATTACHING: &str = "attaching";
|
||||
pub const ACTIVE: &str = "active";
|
||||
pub const STOPPING: &str = "stopping";
|
||||
pub const BROKEN: &str = "broken";
|
||||
Broken { reason: String, backtrace: String },
|
||||
}
|
||||
|
||||
impl TenantState {
|
||||
@@ -49,17 +54,26 @@ impl TenantState {
|
||||
Self::Attaching => true,
|
||||
Self::Active => false,
|
||||
Self::Stopping => false,
|
||||
Self::Broken => false,
|
||||
Self::Broken { .. } => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
pub fn broken_from_reason(reason: String) -> Self {
|
||||
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
|
||||
Self::Broken {
|
||||
reason,
|
||||
backtrace: backtrace_str,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for TenantState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TenantState::Loading => state::LOADING,
|
||||
TenantState::Attaching => state::ATTACHING,
|
||||
TenantState::Active => state::ACTIVE,
|
||||
TenantState::Stopping => state::STOPPING,
|
||||
TenantState::Broken => state::BROKEN,
|
||||
Self::Broken { reason, backtrace } if !reason.is_empty() => {
|
||||
write!(f, "Broken due to: {reason}. Backtrace:\n{backtrace}")
|
||||
}
|
||||
_ => write!(f, "{self}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -121,6 +135,7 @@ pub struct TenantCreateRequest {
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -167,6 +182,7 @@ pub struct TenantConfigRequest {
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
}
|
||||
|
||||
impl TenantConfigRequest {
|
||||
@@ -188,6 +204,7 @@ impl TenantConfigRequest {
|
||||
trace_read_requests: None,
|
||||
eviction_policy: None,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -615,6 +632,7 @@ impl PagestreamBeMessage {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::Buf;
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -665,4 +683,57 @@ mod tests {
|
||||
assert!(msg == reconstructed);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tenantinfo_serde() {
|
||||
// Test serialization/deserialization of TenantInfo
|
||||
let original_active = TenantInfo {
|
||||
id: TenantId::generate(),
|
||||
state: TenantState::Active,
|
||||
current_physical_size: Some(42),
|
||||
has_in_progress_downloads: Some(false),
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
"state": {
|
||||
"slug": "Active",
|
||||
},
|
||||
"current_physical_size": 42,
|
||||
"has_in_progress_downloads": false,
|
||||
});
|
||||
|
||||
let original_broken = TenantInfo {
|
||||
id: TenantId::generate(),
|
||||
state: TenantState::Broken {
|
||||
reason: "reason".into(),
|
||||
backtrace: "backtrace info".into(),
|
||||
},
|
||||
current_physical_size: Some(42),
|
||||
has_in_progress_downloads: Some(false),
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
"state": {
|
||||
"slug": "Broken",
|
||||
"data": {
|
||||
"backtrace": "backtrace info",
|
||||
"reason": "reason",
|
||||
}
|
||||
},
|
||||
"current_physical_size": 42,
|
||||
"has_in_progress_downloads": false,
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&original_active).unwrap(),
|
||||
expected_active
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&original_broken).unwrap(),
|
||||
expected_broken
|
||||
);
|
||||
assert!(format!("{:?}", &original_broken.state).contains("reason"));
|
||||
assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use bindgen::callbacks::ParseCallbacks;
|
||||
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PostgresFfiCallbacks;
|
||||
@@ -20,7 +20,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
|
||||
|
||||
// Add any custom #[derive] attributes to the data structures that bindgen
|
||||
// creates.
|
||||
fn add_derives(&self, name: &str) -> Vec<String> {
|
||||
fn add_derives(&self, derive_info: &DeriveInfo) -> Vec<String> {
|
||||
// This is the list of data structures that we want to serialize/deserialize.
|
||||
let serde_list = [
|
||||
"XLogRecord",
|
||||
@@ -31,7 +31,7 @@ impl ParseCallbacks for PostgresFfiCallbacks {
|
||||
"ControlFileData",
|
||||
];
|
||||
|
||||
if serde_list.contains(&name) {
|
||||
if serde_list.contains(&derive_info.name) {
|
||||
vec![
|
||||
"Default".into(), // Default allows us to easily fill the padding fields with 0.
|
||||
"Serialize".into(),
|
||||
|
||||
@@ -204,12 +204,7 @@ async fn upload_s3_data(
|
||||
let data = format!("remote blob data {i}").into_bytes();
|
||||
let data_len = data.len();
|
||||
task_client
|
||||
.upload(
|
||||
Box::new(std::io::Cursor::new(data)),
|
||||
data_len,
|
||||
&blob_path,
|
||||
None,
|
||||
)
|
||||
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
|
||||
.await?;
|
||||
|
||||
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
|
||||
|
||||
@@ -14,4 +14,5 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -33,7 +33,7 @@ serde_with.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
url.workspace = true
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
uuid.workspace = true
|
||||
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
|
||||
min_lsn = min(min_lsn, lsn_range.start);
|
||||
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer)).unwrap();
|
||||
}
|
||||
|
||||
println!("min: {min_lsn}, max: {max_lsn}");
|
||||
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
is_incremental: false,
|
||||
short_id: format!("Layer {}", i),
|
||||
};
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer)).unwrap();
|
||||
}
|
||||
updates.flush();
|
||||
println!("Finished layer map init in {:?}", now.elapsed());
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde::de::IntoDeserializer;
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -62,7 +63,6 @@ pub mod defaults {
|
||||
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
|
||||
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
|
||||
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
@@ -91,7 +91,6 @@ pub mod defaults {
|
||||
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
|
||||
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
|
||||
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
|
||||
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
|
||||
|
||||
@@ -108,6 +107,7 @@ pub mod defaults {
|
||||
#pitr_interval = '{DEFAULT_PITR_INTERVAL}'
|
||||
|
||||
#min_resident_size_override = .. # in bytes
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
|
||||
# [remote_storage]
|
||||
|
||||
@@ -182,9 +182,6 @@ pub struct PageServerConf {
|
||||
pub metric_collection_endpoint: Option<Url>,
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
|
||||
// See the corresponding metric's help string.
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
|
||||
pub test_remote_failures: u64,
|
||||
@@ -257,8 +254,6 @@ struct PageServerConfigBuilder {
|
||||
metric_collection_endpoint: BuilderValue<Option<Url>>,
|
||||
synthetic_size_calculation_interval: BuilderValue<Duration>,
|
||||
|
||||
evictions_low_residence_duration_metric_threshold: BuilderValue<Duration>,
|
||||
|
||||
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
|
||||
|
||||
test_remote_failures: BuilderValue<u64>,
|
||||
@@ -316,11 +311,6 @@ impl Default for PageServerConfigBuilder {
|
||||
.expect("cannot parse default synthetic size calculation interval")),
|
||||
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
|
||||
|
||||
evictions_low_residence_duration_metric_threshold: Set(humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD")),
|
||||
|
||||
disk_usage_based_eviction: Set(None),
|
||||
|
||||
test_remote_failures: Set(0),
|
||||
@@ -438,10 +428,6 @@ impl PageServerConfigBuilder {
|
||||
self.test_remote_failures = BuilderValue::Set(fail_first);
|
||||
}
|
||||
|
||||
pub fn evictions_low_residence_duration_metric_threshold(&mut self, value: Duration) {
|
||||
self.evictions_low_residence_duration_metric_threshold = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn disk_usage_based_eviction(&mut self, value: Option<DiskUsageEvictionTaskConfig>) {
|
||||
self.disk_usage_based_eviction = BuilderValue::Set(value);
|
||||
}
|
||||
@@ -525,11 +511,6 @@ impl PageServerConfigBuilder {
|
||||
synthetic_size_calculation_interval: self
|
||||
.synthetic_size_calculation_interval
|
||||
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
|
||||
evictions_low_residence_duration_metric_threshold: self
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.ok_or(anyhow!(
|
||||
"missing evictions_low_residence_duration_metric_threshold"
|
||||
))?,
|
||||
disk_usage_based_eviction: self
|
||||
.disk_usage_based_eviction
|
||||
.ok_or(anyhow!("missing disk_usage_based_eviction"))?,
|
||||
@@ -721,12 +702,12 @@ impl PageServerConf {
|
||||
"synthetic_size_calculation_interval" =>
|
||||
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
|
||||
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
|
||||
"evictions_low_residence_duration_metric_threshold" => builder.evictions_low_residence_duration_metric_threshold(parse_toml_duration(key, item)?),
|
||||
"disk_usage_based_eviction" => {
|
||||
tracing::info!("disk_usage_based_eviction: {:#?}", &item);
|
||||
builder.disk_usage_based_eviction(
|
||||
toml_edit::de::from_item(item.clone())
|
||||
.context("parse disk_usage_based_eviction")?)
|
||||
deserialize_from_item("disk_usage_based_eviction", item)
|
||||
.context("parse disk_usage_based_eviction")?
|
||||
)
|
||||
},
|
||||
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
@@ -827,18 +808,25 @@ impl PageServerConf {
|
||||
|
||||
if let Some(eviction_policy) = item.get("eviction_policy") {
|
||||
t_conf.eviction_policy = Some(
|
||||
toml_edit::de::from_item(eviction_policy.clone())
|
||||
deserialize_from_item("eviction_policy", eviction_policy)
|
||||
.context("parse eviction_policy")?,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(item) = item.get("min_resident_size_override") {
|
||||
t_conf.min_resident_size_override = Some(
|
||||
toml_edit::de::from_item(item.clone())
|
||||
deserialize_from_item("min_resident_size_override", item)
|
||||
.context("parse min_resident_size_override")?,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(item) = item.get("evictions_low_residence_duration_metric_threshold") {
|
||||
t_conf.evictions_low_residence_duration_metric_threshold = Some(parse_toml_duration(
|
||||
"evictions_low_residence_duration_metric_threshold",
|
||||
item,
|
||||
)?);
|
||||
}
|
||||
|
||||
Ok(t_conf)
|
||||
}
|
||||
|
||||
@@ -877,10 +865,6 @@ impl PageServerConf {
|
||||
cached_metric_collection_interval: Duration::from_secs(60 * 60),
|
||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||
synthetic_size_calculation_interval: Duration::from_secs(60),
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.unwrap(),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
@@ -938,6 +922,18 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn deserialize_from_item<T>(name: &str, item: &Item) -> anyhow::Result<T>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
// ValueDeserializer::new is not public, so use the ValueDeserializer's documented way
|
||||
let deserializer = match item.clone().into_value() {
|
||||
Ok(value) => value.into_deserializer(),
|
||||
Err(item) => anyhow::bail!("toml_edit::Item '{item}' is not a toml_edit::Value"),
|
||||
};
|
||||
T::deserialize(deserializer).with_context(|| format!("deserializing item for node {name}"))
|
||||
}
|
||||
|
||||
/// Configurable semaphore permits setting.
|
||||
///
|
||||
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
|
||||
@@ -1004,9 +1000,10 @@ mod tests {
|
||||
|
||||
use remote_storage::{RemoteStorageKind, S3Config};
|
||||
use tempfile::{tempdir, TempDir};
|
||||
use utils::serde_percent::Percent;
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::{tenant::config::EvictionPolicy, DEFAULT_PG_VERSION};
|
||||
|
||||
const ALL_BASE_VALUES_TOML: &str = r#"
|
||||
# Initial configuration file created by 'pageserver --init'
|
||||
@@ -1029,8 +1026,6 @@ cached_metric_collection_interval = '22200 s'
|
||||
metric_collection_endpoint = 'http://localhost:80/metrics'
|
||||
synthetic_size_calculation_interval = '333 s'
|
||||
|
||||
evictions_low_residence_duration_metric_threshold = '444 s'
|
||||
|
||||
log_format = 'json'
|
||||
|
||||
"#;
|
||||
@@ -1087,9 +1082,6 @@ log_format = 'json'
|
||||
synthetic_size_calculation_interval: humantime::parse_duration(
|
||||
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
|
||||
)?,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
defaults::DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD
|
||||
)?,
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
@@ -1144,7 +1136,6 @@ log_format = 'json'
|
||||
cached_metric_collection_interval: Duration::from_secs(22200),
|
||||
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
||||
synthetic_size_calculation_interval: Duration::from_secs(333),
|
||||
evictions_low_residence_duration_metric_threshold: Duration::from_secs(444),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
@@ -1310,6 +1301,71 @@ trace_read_requests = {trace_read_requests}"#,
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn eviction_pageserver_config_parse() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
|
||||
let pageserver_conf_toml = format!(
|
||||
r#"pg_distrib_dir = "{}"
|
||||
metric_collection_endpoint = "http://sample.url"
|
||||
metric_collection_interval = "10min"
|
||||
id = 222
|
||||
|
||||
[disk_usage_based_eviction]
|
||||
max_usage_pct = 80
|
||||
min_avail_bytes = 0
|
||||
period = "10s"
|
||||
|
||||
[tenant_config]
|
||||
evictions_low_residence_duration_metric_threshold = "20m"
|
||||
|
||||
[tenant_config.eviction_policy]
|
||||
kind = "LayerAccessThreshold"
|
||||
period = "20m"
|
||||
threshold = "20m"
|
||||
"#,
|
||||
pg_distrib_dir.display(),
|
||||
);
|
||||
let toml: Document = pageserver_conf_toml.parse()?;
|
||||
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
|
||||
|
||||
assert_eq!(conf.pg_distrib_dir, pg_distrib_dir);
|
||||
assert_eq!(
|
||||
conf.metric_collection_endpoint,
|
||||
Some("http://sample.url".parse().unwrap())
|
||||
);
|
||||
assert_eq!(
|
||||
conf.metric_collection_interval,
|
||||
Duration::from_secs(10 * 60)
|
||||
);
|
||||
assert_eq!(
|
||||
conf.default_tenant_conf
|
||||
.evictions_low_residence_duration_metric_threshold,
|
||||
Duration::from_secs(20 * 60)
|
||||
);
|
||||
assert_eq!(conf.id, NodeId(222));
|
||||
assert_eq!(
|
||||
conf.disk_usage_based_eviction,
|
||||
Some(DiskUsageEvictionTaskConfig {
|
||||
max_usage_pct: Percent::new(80).unwrap(),
|
||||
min_avail_bytes: 0,
|
||||
period: Duration::from_secs(10),
|
||||
#[cfg(feature = "testing")]
|
||||
mock_statvfs: None,
|
||||
})
|
||||
);
|
||||
match &conf.default_tenant_conf.eviction_policy {
|
||||
EvictionPolicy::NoEviction => panic!("Unexpected eviction opolicy tenant settings"),
|
||||
EvictionPolicy::LayerAccessThreshold(eviction_thresold) => {
|
||||
assert_eq!(eviction_thresold.period, Duration::from_secs(20 * 60));
|
||||
assert_eq!(eviction_thresold.threshold, Duration::from_secs(20 * 60));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
|
||||
let tempdir_path = tempdir.path();
|
||||
|
||||
|
||||
@@ -829,12 +829,9 @@ components:
|
||||
type: object
|
||||
required:
|
||||
- id
|
||||
- state
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
state:
|
||||
type: string
|
||||
current_physical_size:
|
||||
type: integer
|
||||
has_in_progress_downloads:
|
||||
|
||||
@@ -465,7 +465,7 @@ async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
id: *id,
|
||||
state: *state,
|
||||
state: state.clone(),
|
||||
current_physical_size: None,
|
||||
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
|
||||
})
|
||||
@@ -490,7 +490,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
let state = tenant.current_state();
|
||||
Ok(TenantInfo {
|
||||
id: tenant_id,
|
||||
state,
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
has_in_progress_downloads: Some(state.has_in_progress_downloads()),
|
||||
})
|
||||
@@ -781,6 +781,19 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
if let Some(evictions_low_residence_duration_metric_threshold) =
|
||||
request_data.evictions_low_residence_duration_metric_threshold
|
||||
{
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
|
||||
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
|
||||
.with_context(bad_duration(
|
||||
"evictions_low_residence_duration_metric_threshold",
|
||||
&evictions_low_residence_duration_metric_threshold,
|
||||
))
|
||||
.map_err(ApiError::BadRequest)?,
|
||||
);
|
||||
}
|
||||
|
||||
let target_tenant_id = request_data
|
||||
.new_tenant_id
|
||||
.map(TenantId::from)
|
||||
@@ -914,6 +927,19 @@ async fn update_tenant_config_handler(
|
||||
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
if let Some(evictions_low_residence_duration_metric_threshold) =
|
||||
request_data.evictions_low_residence_duration_metric_threshold
|
||||
{
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
|
||||
humantime::parse_duration(&evictions_low_residence_duration_metric_threshold)
|
||||
.with_context(bad_duration(
|
||||
"evictions_low_residence_duration_metric_threshold",
|
||||
&evictions_low_residence_duration_metric_threshold,
|
||||
))
|
||||
.map_err(ApiError::BadRequest)?,
|
||||
);
|
||||
}
|
||||
|
||||
let state = get_state(&request);
|
||||
mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
|
||||
.instrument(info_span!("tenant_config", tenant = ?tenant_id))
|
||||
@@ -931,7 +957,7 @@ async fn handle_tenant_break(r: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
.await
|
||||
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
|
||||
|
||||
tenant.set_broken("broken from test");
|
||||
tenant.set_broken("broken from test".to_owned());
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
@@ -6,7 +6,8 @@ use metrics::{
|
||||
UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::state;
|
||||
use pageserver_api::models::TenantState;
|
||||
use strum::VariantNames;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// Prometheus histogram buckets (in seconds) for operations in the critical
|
||||
@@ -147,15 +148,6 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define current logical size metric")
|
||||
});
|
||||
|
||||
// Metrics collected on tenant states.
|
||||
const TENANT_STATE_OPTIONS: &[&str] = &[
|
||||
state::LOADING,
|
||||
state::ATTACHING,
|
||||
state::ACTIVE,
|
||||
state::STOPPING,
|
||||
state::BROKEN,
|
||||
];
|
||||
|
||||
pub static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_states_count",
|
||||
@@ -265,6 +257,22 @@ impl EvictionsWithLowResidenceDuration {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn change_threshold(
|
||||
&mut self,
|
||||
tenant_id: &str,
|
||||
timeline_id: &str,
|
||||
new_threshold: Duration,
|
||||
) {
|
||||
if new_threshold == self.threshold {
|
||||
return;
|
||||
}
|
||||
let mut with_new =
|
||||
EvictionsWithLowResidenceDurationBuilder::new(self.data_source, new_threshold)
|
||||
.build(tenant_id, timeline_id);
|
||||
std::mem::swap(self, &mut with_new);
|
||||
with_new.remove(tenant_id, timeline_id);
|
||||
}
|
||||
|
||||
// This could be a `Drop` impl, but, we need the `tenant_id` and `timeline_id`.
|
||||
fn remove(&mut self, tenant_id: &str, timeline_id: &str) {
|
||||
let Some(_counter) = self.counter.take() else {
|
||||
@@ -597,7 +605,7 @@ pub struct TimelineMetrics {
|
||||
pub num_persistent_files_created: IntCounter,
|
||||
pub persistent_bytes_written: IntCounter,
|
||||
pub evictions: IntCounter,
|
||||
pub evictions_with_low_residence_duration: EvictionsWithLowResidenceDuration,
|
||||
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
||||
}
|
||||
|
||||
impl TimelineMetrics {
|
||||
@@ -664,7 +672,9 @@ impl TimelineMetrics {
|
||||
num_persistent_files_created,
|
||||
persistent_bytes_written,
|
||||
evictions,
|
||||
evictions_with_low_residence_duration,
|
||||
evictions_with_low_residence_duration: std::sync::RwLock::new(
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -683,6 +693,8 @@ impl Drop for TimelineMetrics {
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
self.evictions_with_low_residence_duration
|
||||
.write()
|
||||
.unwrap()
|
||||
.remove(tenant_id, timeline_id);
|
||||
for op in STORAGE_TIME_OPERATIONS {
|
||||
let _ =
|
||||
@@ -707,7 +719,7 @@ impl Drop for TimelineMetrics {
|
||||
pub fn remove_tenant_metrics(tenant_id: &TenantId) {
|
||||
let tid = tenant_id.to_string();
|
||||
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
|
||||
for state in TENANT_STATE_OPTIONS {
|
||||
for state in TenantState::VARIANTS {
|
||||
let _ = TENANT_STATE_METRIC.remove_label_values(&[&tid, state]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
let msg = format!("pageserver is shutting down");
|
||||
let msg = "pageserver is shutting down".to_string();
|
||||
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
|
||||
Err(QueryError::Other(anyhow::anyhow!(msg)))
|
||||
}
|
||||
|
||||
@@ -267,7 +267,10 @@ impl UninitializedTimeline<'_> {
|
||||
.await
|
||||
.context("Failed to flush after basebackup import")?;
|
||||
|
||||
self.initialize(ctx)
|
||||
// Initialize without loading the layer map. We started with an empty layer map, and already
|
||||
// updated it for the layers that we created during the import.
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
self.initialize_with_lock(ctx, &mut timelines, false, true)
|
||||
}
|
||||
|
||||
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
|
||||
@@ -619,7 +622,7 @@ impl Tenant {
|
||||
match tenant_clone.attach(ctx).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tenant_clone.set_broken(&e.to_string());
|
||||
tenant_clone.set_broken(e.to_string());
|
||||
error!("error attaching tenant: {:?}", e);
|
||||
}
|
||||
}
|
||||
@@ -827,7 +830,10 @@ impl Tenant {
|
||||
pub fn create_broken_tenant(conf: &'static PageServerConf, tenant_id: TenantId) -> Arc<Tenant> {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
Arc::new(Tenant::new(
|
||||
TenantState::Broken,
|
||||
TenantState::Broken {
|
||||
reason: "create_broken_tenant".into(),
|
||||
backtrace: String::new(),
|
||||
},
|
||||
conf,
|
||||
TenantConfOpt::default(),
|
||||
wal_redo_manager,
|
||||
@@ -888,7 +894,7 @@ impl Tenant {
|
||||
match tenant_clone.load(&ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
tenant_clone.set_broken(&err.to_string());
|
||||
tenant_clone.set_broken(err.to_string());
|
||||
error!("could not load tenant {tenant_id}: {err:?}");
|
||||
}
|
||||
}
|
||||
@@ -1440,7 +1446,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> TenantState {
|
||||
*self.state.borrow()
|
||||
self.state.borrow().clone()
|
||||
}
|
||||
|
||||
pub fn is_active(&self) -> bool {
|
||||
@@ -1451,15 +1457,15 @@ impl Tenant {
|
||||
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
let mut result = Ok(());
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
match &*current_state {
|
||||
TenantState::Active => {
|
||||
// activate() was called on an already Active tenant. Shouldn't happen.
|
||||
result = Err(anyhow::anyhow!("Tenant is already active"));
|
||||
}
|
||||
TenantState::Broken => {
|
||||
TenantState::Broken { reason, .. } => {
|
||||
// This shouldn't happen either
|
||||
result = Err(anyhow::anyhow!(
|
||||
"Could not activate tenant because it is in broken state"
|
||||
"Could not activate tenant because it is in broken state due to: {reason}",
|
||||
));
|
||||
}
|
||||
TenantState::Stopping => {
|
||||
@@ -1493,7 +1499,10 @@ impl Tenant {
|
||||
timeline.timeline_id, e
|
||||
);
|
||||
timeline.set_state(TimelineState::Broken);
|
||||
*current_state = TenantState::Broken;
|
||||
*current_state = TenantState::broken_from_reason(format!(
|
||||
"failed to activate timeline {}: {}",
|
||||
timeline.timeline_id, e
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1506,7 +1515,7 @@ impl Tenant {
|
||||
/// Change tenant status to Stopping, to mark that it is being shut down
|
||||
pub fn set_stopping(&self) {
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
match current_state {
|
||||
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
|
||||
*current_state = TenantState::Stopping;
|
||||
|
||||
@@ -1522,8 +1531,8 @@ impl Tenant {
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
}
|
||||
}
|
||||
TenantState::Broken => {
|
||||
info!("Cannot set tenant to Stopping state, it is already in Broken state");
|
||||
TenantState::Broken { reason, .. } => {
|
||||
info!("Cannot set tenant to Stopping state, it is in Broken state due to: {reason}");
|
||||
}
|
||||
TenantState::Stopping => {
|
||||
// The tenant was detached, or system shutdown was requested, while we were
|
||||
@@ -1534,7 +1543,7 @@ impl Tenant {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn set_broken(&self, reason: &str) {
|
||||
pub fn set_broken(&self, reason: String) {
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Active => {
|
||||
@@ -1542,24 +1551,24 @@ impl Tenant {
|
||||
// while loading or attaching a tenant. A tenant that has already been
|
||||
// activated should never be marked as broken. We cope with it the best
|
||||
// we can, but it shouldn't happen.
|
||||
*current_state = TenantState::Broken;
|
||||
warn!("Changing Active tenant to Broken state, reason: {}", reason);
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
}
|
||||
TenantState::Broken => {
|
||||
TenantState::Broken { .. } => {
|
||||
// This shouldn't happen either
|
||||
warn!("Tenant is already in Broken state");
|
||||
}
|
||||
TenantState::Stopping => {
|
||||
// This shouldn't happen either
|
||||
*current_state = TenantState::Broken;
|
||||
warn!(
|
||||
"Marking Stopping tenant as Broken state, reason: {}",
|
||||
reason
|
||||
);
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
}
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
info!("Setting tenant as Broken state, reason: {}", reason);
|
||||
*current_state = TenantState::Broken;
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -1572,7 +1581,7 @@ impl Tenant {
|
||||
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
|
||||
let mut receiver = self.state.subscribe();
|
||||
loop {
|
||||
let current_state = *receiver.borrow_and_update();
|
||||
let current_state = receiver.borrow_and_update().clone();
|
||||
match current_state {
|
||||
TenantState::Loading | TenantState::Attaching => {
|
||||
// in these states, there's a chance that we can reach ::Active
|
||||
@@ -1581,12 +1590,12 @@ impl Tenant {
|
||||
TenantState::Active { .. } => {
|
||||
return Ok(());
|
||||
}
|
||||
TenantState::Broken | TenantState::Stopping => {
|
||||
TenantState::Broken { .. } | TenantState::Stopping => {
|
||||
// There's no chance the tenant can transition back into ::Active
|
||||
anyhow::bail!(
|
||||
"Tenant {} will not become active. Current state: {:?}",
|
||||
self.tenant_id,
|
||||
current_state,
|
||||
¤t_state,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1726,6 +1735,13 @@ impl Tenant {
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
*self.tenant_conf.write().unwrap() = new_tenant_conf;
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
}
|
||||
}
|
||||
|
||||
fn create_timeline_data(
|
||||
@@ -1767,21 +1783,23 @@ impl Tenant {
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let current_state = *rx.borrow_and_update();
|
||||
let mut current_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
let tid = tenant_id.to_string();
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, current_state.as_str()])
|
||||
.with_label_values(&[&tid, current_state])
|
||||
.inc();
|
||||
loop {
|
||||
match rx.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = *rx.borrow();
|
||||
let new_state: &'static str = From::from(&*rx.borrow_and_update());
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, current_state.as_str()])
|
||||
.with_label_values(&[&tid, current_state])
|
||||
.dec();
|
||||
TENANT_STATE_METRIC
|
||||
.with_label_values(&[&tid, new_state.as_str()])
|
||||
.with_label_values(&[&tid, new_state])
|
||||
.inc();
|
||||
|
||||
current_state = new_state;
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
info!("Tenant dropped the state updates sender, quitting waiting for tenant state change");
|
||||
@@ -1876,7 +1894,7 @@ impl Tenant {
|
||||
.to_string();
|
||||
|
||||
// Convert the config to a toml file.
|
||||
conf_content += &toml_edit::easy::to_string(&tenant_conf)?;
|
||||
conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
|
||||
|
||||
let mut target_config_file = VirtualFile::open_with_options(
|
||||
target_config_path,
|
||||
@@ -2308,6 +2326,8 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
// Initialize the timeline without loading the layer map, because we already updated the layer
|
||||
// map above, when we imported the datadir.
|
||||
let timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
|
||||
@@ -2802,6 +2822,9 @@ pub mod harness {
|
||||
trace_read_requests: Some(tenant_conf.trace_read_requests),
|
||||
eviction_policy: Some(tenant_conf.eviction_policy),
|
||||
min_resident_size_override: tenant_conf.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: Some(
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
@@ -93,6 +94,9 @@ pub struct TenantConf {
|
||||
pub trace_read_requests: bool,
|
||||
pub eviction_policy: EvictionPolicy,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -164,6 +168,11 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -228,6 +237,9 @@ impl TenantConfOpt {
|
||||
min_resident_size_override: self
|
||||
.min_resident_size_override
|
||||
.or(global_conf.min_resident_size_override),
|
||||
evictions_low_residence_duration_metric_threshold: self
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -260,6 +272,10 @@ impl Default for TenantConf {
|
||||
trace_read_requests: false,
|
||||
eviction_policy: EvictionPolicy::NoEviction,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: humantime::parse_duration(
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -275,9 +291,9 @@ mod tests {
|
||||
..TenantConfOpt::default()
|
||||
};
|
||||
|
||||
let toml_form = toml_edit::easy::to_string(&small_conf).unwrap();
|
||||
let toml_form = toml_edit::ser::to_string(&small_conf).unwrap();
|
||||
assert_eq!(toml_form, "gc_horizon = 42\n");
|
||||
assert_eq!(small_conf, toml_edit::easy::from_str(&toml_form).unwrap());
|
||||
assert_eq!(small_conf, toml_edit::de::from_str(&toml_form).unwrap());
|
||||
|
||||
let json_form = serde_json::to_string(&small_conf).unwrap();
|
||||
assert_eq!(json_form, "{\"gc_horizon\":42}");
|
||||
|
||||
@@ -52,7 +52,7 @@ use crate::metrics::NUM_ONDISK_LAYERS;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use anyhow::Result;
|
||||
use anyhow::{bail, Result};
|
||||
use std::collections::VecDeque;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
@@ -126,7 +126,7 @@ where
|
||||
///
|
||||
/// Insert an on-disk layer.
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<L>) {
|
||||
pub fn insert_historic(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
|
||||
self.layer_map.insert_historic_noflush(layer)
|
||||
}
|
||||
|
||||
@@ -274,17 +274,22 @@ where
|
||||
///
|
||||
/// Helper function for BatchedUpdates::insert_historic
|
||||
///
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
|
||||
self.historic.insert(
|
||||
historic_layer_coverage::LayerKey::from(&*layer),
|
||||
Arc::clone(&layer),
|
||||
);
|
||||
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) -> anyhow::Result<()> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&*layer);
|
||||
if self.historic.contains(&key) {
|
||||
bail!(
|
||||
"Attempt to insert duplicate layer {} in layer map",
|
||||
layer.short_id()
|
||||
);
|
||||
}
|
||||
self.historic.insert(key, Arc::clone(&layer));
|
||||
|
||||
if Self::is_l0(&layer) {
|
||||
self.l0_delta_layers.push(layer);
|
||||
}
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
@@ -838,7 +843,7 @@ mod tests {
|
||||
|
||||
let expected_in_counts = (1, usize::from(expected_l0));
|
||||
|
||||
map.batch_update().insert_historic(remote.clone());
|
||||
map.batch_update().insert_historic(remote.clone()).unwrap();
|
||||
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
|
||||
|
||||
let replaced = map
|
||||
|
||||
@@ -417,6 +417,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains(&self, layer_key: &LayerKey) -> bool {
|
||||
match self.buffer.get(layer_key) {
|
||||
Some(None) => false, // layer remove was buffered
|
||||
Some(_) => true, // layer insert was buffered
|
||||
None => self.layers.contains_key(layer_key), // no buffered ops for this layer
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, layer_key: LayerKey, value: Value) {
|
||||
self.buffer.insert(layer_key, Some(value));
|
||||
}
|
||||
|
||||
@@ -537,7 +537,7 @@ where
|
||||
Some(tenant) => match tenant.current_state() {
|
||||
TenantState::Attaching
|
||||
| TenantState::Loading
|
||||
| TenantState::Broken
|
||||
| TenantState::Broken { .. }
|
||||
| TenantState::Active => tenant.set_stopping(),
|
||||
TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)),
|
||||
},
|
||||
@@ -565,7 +565,7 @@ where
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => {
|
||||
tenant.set_broken(&e.to_string());
|
||||
tenant.set_broken(e.to_string());
|
||||
}
|
||||
None => {
|
||||
warn!("Tenant {tenant_id} got removed from memory");
|
||||
|
||||
@@ -74,7 +74,7 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
})?;
|
||||
|
||||
storage
|
||||
.upload(Box::new(source_file), fs_size, &storage_path, None)
|
||||
.upload(source_file, fs_size, &storage_path, None)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -209,7 +209,7 @@ async fn wait_for_active_tenant(
|
||||
loop {
|
||||
match tenant_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = *tenant_state_updates.borrow();
|
||||
let new_state = &*tenant_state_updates.borrow();
|
||||
match new_state {
|
||||
TenantState::Active => {
|
||||
debug!("Tenant state changed to active, continuing the task loop");
|
||||
|
||||
@@ -77,6 +77,7 @@ pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -161,7 +162,7 @@ pub struct Timeline {
|
||||
ancestor_timeline: Option<Arc<Timeline>>,
|
||||
ancestor_lsn: Lsn,
|
||||
|
||||
metrics: TimelineMetrics,
|
||||
pub(super) metrics: TimelineMetrics,
|
||||
|
||||
/// Ensures layers aren't frozen by checkpointer between
|
||||
/// [`Timeline::get_layer_for_write`] and layer reads.
|
||||
@@ -1136,6 +1137,8 @@ impl Timeline {
|
||||
if let Some(delta) = local_layer_residence_duration {
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.read()
|
||||
.unwrap()
|
||||
.observe(delta);
|
||||
info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period");
|
||||
} else {
|
||||
@@ -1209,6 +1212,35 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
|
||||
}
|
||||
|
||||
fn get_evictions_low_residence_duration_metric_threshold(
|
||||
tenant_conf: &TenantConfOpt,
|
||||
default_tenant_conf: &TenantConf,
|
||||
) -> Duration {
|
||||
tenant_conf
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&self.tenant_conf.read().unwrap(),
|
||||
&self.conf.default_tenant_conf,
|
||||
);
|
||||
let tenant_id_str = self.tenant_id.to_string();
|
||||
let timeline_id_str = self.timeline_id.to_string();
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.write()
|
||||
.unwrap()
|
||||
.change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
|
||||
}
|
||||
}
|
||||
|
||||
/// Open a Timeline handle.
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory, but not the layer map.
|
||||
@@ -1240,6 +1272,11 @@ impl Timeline {
|
||||
let max_lsn_wal_lag = tenant_conf_guard
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
let evictions_low_residence_duration_metric_threshold =
|
||||
Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&tenant_conf_guard,
|
||||
&conf.default_tenant_conf,
|
||||
);
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
@@ -1287,7 +1324,7 @@ impl Timeline {
|
||||
&timeline_id,
|
||||
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
|
||||
"mtime",
|
||||
conf.evictions_low_residence_duration_metric_threshold,
|
||||
evictions_low_residence_duration_metric_threshold,
|
||||
),
|
||||
),
|
||||
|
||||
@@ -1446,7 +1483,7 @@ impl Timeline {
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer))?;
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
@@ -1478,7 +1515,7 @@ impl Timeline {
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(layer))?;
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
@@ -1552,7 +1589,7 @@ impl Timeline {
|
||||
// remote index file?
|
||||
// If so, rename_to_backup those files & replace their local layer with
|
||||
// a RemoteLayer in the layer map so that we re-download them on-demand.
|
||||
if let Some(local_layer) = local_layer {
|
||||
if let Some(local_layer) = &local_layer {
|
||||
let local_layer_path = local_layer
|
||||
.local_path()
|
||||
.expect("caller must ensure that local_layers only contains local layers");
|
||||
@@ -1577,7 +1614,6 @@ impl Timeline {
|
||||
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
||||
} else {
|
||||
self.metrics.resident_physical_size_gauge.sub(local_size);
|
||||
updates.remove_historic(local_layer);
|
||||
// fall-through to adding the remote layer
|
||||
}
|
||||
} else {
|
||||
@@ -1613,7 +1649,11 @@ impl Timeline {
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
updates.insert_historic(remote_layer);
|
||||
if let Some(local_layer) = &local_layer {
|
||||
updates.replace_historic(local_layer, remote_layer)?;
|
||||
} else {
|
||||
updates.insert_historic(remote_layer)?;
|
||||
}
|
||||
}
|
||||
LayerFileName::Delta(deltafilename) => {
|
||||
// Create a RemoteLayer for the delta file.
|
||||
@@ -1637,7 +1677,11 @@ impl Timeline {
|
||||
LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
updates.insert_historic(remote_layer);
|
||||
if let Some(local_layer) = &local_layer {
|
||||
updates.replace_historic(local_layer, remote_layer)?;
|
||||
} else {
|
||||
updates.insert_historic(remote_layer)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2684,7 +2728,7 @@ impl Timeline {
|
||||
.write()
|
||||
.unwrap()
|
||||
.batch_update()
|
||||
.insert_historic(Arc::new(new_delta));
|
||||
.insert_historic(Arc::new(new_delta))?;
|
||||
|
||||
// update the timeline's physical size
|
||||
let sz = new_delta_path.metadata()?.len();
|
||||
@@ -2889,7 +2933,7 @@ impl Timeline {
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
updates.insert_historic(Arc::new(l));
|
||||
updates.insert_historic(Arc::new(l))?;
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
@@ -3322,7 +3366,7 @@ impl Timeline {
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
updates.insert_historic(x);
|
||||
updates.insert_historic(x)?;
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
|
||||
@@ -23,7 +23,7 @@ hmac.workspace = true
|
||||
hostname.workspace = true
|
||||
humantime.workspace = true
|
||||
hyper-tungstenite.workspace = true
|
||||
hyper.workspace = true
|
||||
hyper = { workspace = true, features = ["http2", "http1", "tcp", "runtime"] }
|
||||
itertools.workspace = true
|
||||
md5.workspace = true
|
||||
metrics.workspace = true
|
||||
@@ -64,6 +64,8 @@ webpki-roots.workspace = true
|
||||
x509-parser.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
tokio-util.workspace = true
|
||||
percent-encoding.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rcgen.workspace = true
|
||||
|
||||
@@ -40,7 +40,7 @@ pub fn configure_tls(
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
cert_resolver.add_cert(key_path, cert_path)?;
|
||||
cert_resolver.add_cert(key_path, cert_path, true)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
@@ -52,8 +52,11 @@ pub fn configure_tls(
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver
|
||||
.add_cert(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
|
||||
cert_resolver.add_cert(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -78,16 +81,23 @@ pub fn configure_tls(
|
||||
|
||||
struct CertResolver {
|
||||
certs: HashMap<String, Arc<rustls::sign::CertifiedKey>>,
|
||||
default: Option<Arc<rustls::sign::CertifiedKey>>,
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
certs: HashMap::new(),
|
||||
default: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn add_cert(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
|
||||
fn add_cert(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
|
||||
@@ -136,10 +146,13 @@ impl CertResolver {
|
||||
"Failed to parse common name from certificate at '{cert_path}'."
|
||||
))?;
|
||||
|
||||
self.certs.insert(
|
||||
common_name,
|
||||
Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key)),
|
||||
);
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some(cert.clone());
|
||||
}
|
||||
|
||||
self.certs.insert(common_name, cert);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -172,7 +185,17 @@ impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
// No SNI, use the default certificate, otherwise we can't get to
|
||||
// options parameter which can be used to set endpoint name too.
|
||||
// That means that non-SNI flow will not work for CNAME domains in
|
||||
// verify-full mode.
|
||||
//
|
||||
// If that will be a problem we can:
|
||||
//
|
||||
// a) Instead of multi-cert approach use single cert with extra
|
||||
// domains listed in Subject Alternative Name (SAN).
|
||||
// b) Deploy separate proxy instances for extra domains.
|
||||
self.default.as_ref().cloned()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! Other modules should use stuff from this module instead of
|
||||
//! directly relying on deps like `reqwest` (think loose coupling).
|
||||
|
||||
pub mod pg_to_json;
|
||||
pub mod server;
|
||||
pub mod websocket;
|
||||
|
||||
|
||||
154
proxy/src/http/pg_to_json.rs
Normal file
154
proxy/src/http/pg_to_json.rs
Normal file
@@ -0,0 +1,154 @@
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use chrono::{Utc, DateTime, NaiveDateTime, NaiveTime, NaiveDate};
|
||||
use serde_json::Map;
|
||||
use tokio_postgres::{
|
||||
types::{FromSql, Type},
|
||||
Column, Row,
|
||||
};
|
||||
|
||||
// some type-aliases I use in my project
|
||||
pub type JSONValue = serde_json::Value;
|
||||
pub type RowData = Map<String, JSONValue>;
|
||||
pub type Error = anyhow::Error; // from: https://github.com/dtolnay/anyhow
|
||||
|
||||
pub fn postgres_row_to_json_value(row: &Row) -> Result<JSONValue, Error> {
|
||||
let row_data = postgres_row_to_row_data(row)?;
|
||||
Ok(JSONValue::Object(row_data))
|
||||
}
|
||||
|
||||
pub fn postgres_row_to_row_data(row: &Row) -> Result<RowData, Error> {
|
||||
let mut result: Map<String, JSONValue> = Map::new();
|
||||
for (i, column) in row.columns().iter().enumerate() {
|
||||
let name = column.name();
|
||||
let json_value = pg_cell_to_json_value(&row, column, i)?;
|
||||
result.insert(name.to_string(), json_value);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn pg_cell_to_json_value(
|
||||
row: &Row,
|
||||
column: &Column,
|
||||
column_i: usize,
|
||||
) -> Result<JSONValue, Error> {
|
||||
let f64_to_json_number = |raw_val: f64| -> Result<JSONValue, Error> {
|
||||
let temp = serde_json::Number::from_f64(raw_val).ok_or(anyhow!("invalid json-float"))?;
|
||||
Ok(JSONValue::Number(temp))
|
||||
};
|
||||
Ok(match *column.type_() {
|
||||
// for rust-postgres <> postgres type-mappings: https://docs.rs/postgres/latest/postgres/types/trait.FromSql.html#types
|
||||
// for postgres types: https://www.postgresql.org/docs/7.4/datatype.html#DATATYPE-TABLE
|
||||
|
||||
Type::TIMESTAMPTZ => get_basic(row, column, column_i, |a: DateTime<Utc>| {
|
||||
Ok(JSONValue::String(a.to_rfc3339()))
|
||||
})?,
|
||||
Type::TIMESTAMP => get_basic(row, column, column_i, |a: NaiveDateTime| {
|
||||
Ok(JSONValue::String(a.format("%Y-%m-%dT%H:%M:%S%.6f").to_string()))
|
||||
})?,
|
||||
|
||||
Type::TIME => get_basic(row, column, column_i, |a: NaiveTime| {
|
||||
Ok(JSONValue::String(a.format("%H:%M:%S%.6f").to_string()))
|
||||
})?,
|
||||
Type::DATE => get_basic(row, column, column_i, |a: NaiveDate| {
|
||||
Ok(JSONValue::String(a.format("%Y-%m-%d").to_string()))
|
||||
})?,
|
||||
// no TIMETZ support?
|
||||
|
||||
// single types
|
||||
Type::BOOL => get_basic(row, column, column_i, |a: bool| Ok(JSONValue::Bool(a)))?,
|
||||
Type::INT2 => get_basic(row, column, column_i, |a: i16| {
|
||||
Ok(JSONValue::Number(serde_json::Number::from(a)))
|
||||
})?,
|
||||
Type::INT4 => get_basic(row, column, column_i, |a: i32| {
|
||||
Ok(JSONValue::Number(serde_json::Number::from(a)))
|
||||
})?,
|
||||
Type::INT8 => get_basic(row, column, column_i, |a: i64| {
|
||||
Ok(JSONValue::Number(serde_json::Number::from(a)))
|
||||
})?,
|
||||
Type::TEXT | Type::VARCHAR => {
|
||||
get_basic(row, column, column_i, |a: String| Ok(JSONValue::String(a)))?
|
||||
}
|
||||
// Type::JSON | Type::JSONB => get_basic(row, column, column_i, |a: JSONValue| Ok(a))?,
|
||||
Type::FLOAT4 => get_basic(row, column, column_i, |a: f32| f64_to_json_number(a.into()))?,
|
||||
Type::FLOAT8 => get_basic(row, column, column_i, f64_to_json_number)?,
|
||||
// these types require a custom StringCollector struct as an intermediary (see struct at bottom)
|
||||
Type::TS_VECTOR => get_basic(row, column, column_i, |a: StringCollector| {
|
||||
Ok(JSONValue::String(a.0))
|
||||
})?,
|
||||
|
||||
// array types
|
||||
Type::BOOL_ARRAY => get_array(row, column, column_i, |a: bool| Ok(JSONValue::Bool(a)))?,
|
||||
Type::INT2_ARRAY => get_array(row, column, column_i, |a: i16| {
|
||||
Ok(JSONValue::Number(serde_json::Number::from(a)))
|
||||
})?,
|
||||
Type::INT4_ARRAY => get_array(row, column, column_i, |a: i32| {
|
||||
Ok(JSONValue::Number(serde_json::Number::from(a)))
|
||||
})?,
|
||||
Type::INT8_ARRAY => get_array(row, column, column_i, |a: i64| {
|
||||
Ok(JSONValue::Number(serde_json::Number::from(a)))
|
||||
})?,
|
||||
Type::TEXT_ARRAY | Type::VARCHAR_ARRAY => {
|
||||
get_array(row, column, column_i, |a: String| Ok(JSONValue::String(a)))?
|
||||
}
|
||||
// Type::JSON_ARRAY | Type::JSONB_ARRAY => get_array(row, column, column_i, |a: JSONValue| Ok(a))?,
|
||||
Type::FLOAT4_ARRAY => get_array(row, column, column_i, f64_to_json_number)?,
|
||||
Type::FLOAT8_ARRAY => get_array(row, column, column_i, f64_to_json_number)?,
|
||||
// these types require a custom StringCollector struct as an intermediary (see struct at bottom)
|
||||
Type::TS_VECTOR_ARRAY => get_array(row, column, column_i, |a: StringCollector| {
|
||||
Ok(JSONValue::String(a.0))
|
||||
})?,
|
||||
|
||||
_ => anyhow::bail!(
|
||||
"Cannot convert pg-cell \"{}\" of type \"{}\" to a JSONValue.",
|
||||
column.name(),
|
||||
column.type_().name()
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
fn get_basic<'a, T: FromSql<'a>>(
|
||||
row: &'a Row,
|
||||
column: &Column,
|
||||
column_i: usize,
|
||||
val_to_json_val: impl Fn(T) -> Result<JSONValue, Error>,
|
||||
) -> Result<JSONValue, Error> {
|
||||
let raw_val = row
|
||||
.try_get::<_, Option<T>>(column_i)
|
||||
.with_context(|| format!("column_name:{}", column.name()))?;
|
||||
raw_val.map_or(Ok(JSONValue::Null), val_to_json_val)
|
||||
}
|
||||
fn get_array<'a, T: FromSql<'a>>(
|
||||
row: &'a Row,
|
||||
column: &Column,
|
||||
column_i: usize,
|
||||
val_to_json_val: impl Fn(T) -> Result<JSONValue, Error>,
|
||||
) -> Result<JSONValue, Error> {
|
||||
let raw_val_array = row
|
||||
.try_get::<_, Option<Vec<T>>>(column_i)
|
||||
.with_context(|| format!("column_name:{}", column.name()))?;
|
||||
Ok(match raw_val_array {
|
||||
Some(val_array) => {
|
||||
let mut result = vec![];
|
||||
for val in val_array {
|
||||
result.push(val_to_json_val(val)?);
|
||||
}
|
||||
JSONValue::Array(result)
|
||||
}
|
||||
None => JSONValue::Null,
|
||||
})
|
||||
}
|
||||
|
||||
struct StringCollector(String);
|
||||
impl FromSql<'_> for StringCollector {
|
||||
fn from_sql(
|
||||
_: &Type,
|
||||
raw: &[u8],
|
||||
) -> Result<StringCollector, Box<dyn std::error::Error + Sync + Send>> {
|
||||
let result = std::str::from_utf8(raw)?;
|
||||
Ok(StringCollector(result.to_owned()))
|
||||
}
|
||||
fn accepts(_ty: &Type) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,25 @@
|
||||
use crate::http::pg_to_json::postgres_row_to_json_value;
|
||||
use crate::{
|
||||
cancellation::CancelMap, config::ProxyConfig, error::io_error, proxy::handle_ws_client,
|
||||
auth, cancellation::CancelMap, config::ProxyConfig, console, error::io_error,
|
||||
proxy::handle_ws_client,
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
use futures::{Sink, Stream, StreamExt, TryStreamExt};
|
||||
use tokio_postgres::Row;
|
||||
use std::collections::HashMap;
|
||||
use hyper::{
|
||||
server::{accept, conn::AddrIncoming},
|
||||
upgrade::Upgraded,
|
||||
Body, Request, Response, StatusCode,
|
||||
Body, Method, Request, Response, StatusCode,
|
||||
};
|
||||
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
|
||||
use percent_encoding::percent_decode;
|
||||
use pin_project_lite::pin_project;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use serde_json::{Value, json};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use tokio_postgres::types::{ToSql};
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
future::ready,
|
||||
@@ -22,7 +32,9 @@ use tokio::{
|
||||
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
|
||||
net::TcpListener,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use url::{Url};
|
||||
use utils::http::{error::ApiError, json::json_response};
|
||||
|
||||
// TODO: use `std::sync::Exclusive` once it's stabilized.
|
||||
@@ -40,10 +52,10 @@ pin_project! {
|
||||
}
|
||||
|
||||
impl WebSocketRw {
|
||||
pub fn new(stream: WebSocketStream<Upgraded>) -> Self {
|
||||
pub fn new(stream: WebSocketStream<Upgraded>, startup_data: Bytes) -> Self {
|
||||
Self {
|
||||
stream: stream.into(),
|
||||
bytes: Bytes::new(),
|
||||
bytes: startup_data,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -80,6 +92,7 @@ impl AsyncRead for WebSocketRw {
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
|
||||
if buf.remaining() > 0 {
|
||||
let bytes = ready!(self.as_mut().poll_fill_buf(cx))?;
|
||||
let len = std::cmp::min(bytes.len(), buf.remaining());
|
||||
@@ -140,25 +153,101 @@ async fn serve_websocket(
|
||||
cancel_map: &CancelMap,
|
||||
session_id: uuid::Uuid,
|
||||
hostname: Option<String>,
|
||||
startup_data: Vec<u8>,
|
||||
) -> anyhow::Result<()> {
|
||||
|
||||
let websocket = websocket.await?;
|
||||
|
||||
handle_ws_client(
|
||||
config,
|
||||
cancel_map,
|
||||
session_id,
|
||||
WebSocketRw::new(websocket),
|
||||
WebSocketRw::new(websocket, startup_data.into()),
|
||||
hostname,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MyObject {
|
||||
data: Vec<u8>,
|
||||
recv_data: Vec<u8>,
|
||||
}
|
||||
|
||||
impl MyObject {
|
||||
fn new(data: Vec<u8>) -> Self {
|
||||
MyObject {
|
||||
data,
|
||||
recv_data: Vec::with_capacity(512),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AsyncRead for MyObject {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
|
||||
let data = &self.get_mut().data;
|
||||
|
||||
// let len: usize = recv_data.len();
|
||||
// info!("len: {}", len);
|
||||
// let mut i: usize = 0;
|
||||
// let mut zcount = 0;
|
||||
// while len >= i + 5 {
|
||||
// let cmd = recv_data[i];
|
||||
// if cmd == 0x5a { zcount += 1; }
|
||||
// let size = u32::from_be_bytes(recv_data[(i + 1)..(i + 5)].try_into().unwrap());
|
||||
// info!("cmd: {} size: {} buf: {:?}", cmd, size, recv_data);
|
||||
// i += usize::try_from(size).unwrap() + 1;
|
||||
// }
|
||||
// if zcount < 2 {
|
||||
// Poll::Pending
|
||||
|
||||
// } else {
|
||||
// let mut reader = &recv_data[..];
|
||||
// Pin::new(&mut reader).poll_read(cx, buf)
|
||||
// }
|
||||
|
||||
let mut reader = &data[..];
|
||||
Pin::new(&mut reader).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for MyObject {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
|
||||
// eprintln!("{:?}", buf);
|
||||
let recv_data = &mut self.get_mut().recv_data;
|
||||
recv_data.extend(buf);
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
// ...
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
// ...
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async fn ws_handler(
|
||||
mut request: Request<Body>,
|
||||
config: &'static ProxyConfig,
|
||||
cancel_map: Arc<CancelMap>,
|
||||
session_id: uuid::Uuid,
|
||||
cache: Arc<Mutex<ConnectionCache>>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
let host = request
|
||||
.headers()
|
||||
.get("host")
|
||||
@@ -168,26 +257,313 @@ async fn ws_handler(
|
||||
|
||||
// Check if the request is a websocket upgrade request.
|
||||
if hyper_tungstenite::is_upgrade_request(&request) {
|
||||
let startup_data = match request.uri().query() {
|
||||
Some(b64_str) => match base64::decode_config(b64_str, base64::URL_SAFE) {
|
||||
Ok(x) => x,
|
||||
Err(_) => {
|
||||
eprintln!("invalid WebSocket base64 startup data");
|
||||
vec![]
|
||||
}
|
||||
},
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
info!("{} bytes of startup data received via WebSocket URL query", startup_data.len());
|
||||
|
||||
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
|
||||
.map_err(|e| ApiError::BadRequest(e.into()))?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = serve_websocket(websocket, config, &cancel_map, session_id, host).await
|
||||
{
|
||||
if let Err(e) = serve_websocket(websocket, config, &cancel_map, session_id, host, startup_data).await {
|
||||
error!("error in websocket connection: {e:?}");
|
||||
}
|
||||
});
|
||||
|
||||
// Return the response so the spawned future can continue.
|
||||
Ok(response)
|
||||
|
||||
} else if request.uri().path() == "/pg-protocol" && request.method() == Method::POST {
|
||||
let mut body = request.into_body();
|
||||
let mut data = Vec::with_capacity(512);
|
||||
while let Some(chunk) = body.next().await {
|
||||
data.extend(&chunk.map_err(|e| ApiError::InternalServerError(e.into()))?);
|
||||
}
|
||||
|
||||
let mut my_object = MyObject::new(data);
|
||||
let my_object = tokio::spawn(async move {
|
||||
let result = handle_ws_client(
|
||||
config,
|
||||
&cancel_map,
|
||||
session_id,
|
||||
&mut my_object,
|
||||
host,
|
||||
).await;
|
||||
my_object
|
||||
})
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
let response = Response::builder()
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.header("Access-Control-Allow-Origin", "*")
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::from(my_object.recv_data))
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
Ok(response)
|
||||
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
let result = handle_sql(config, request).await;
|
||||
let status_code = match result {
|
||||
Ok(_) => StatusCode::OK,
|
||||
Err(_) => StatusCode::BAD_REQUEST
|
||||
};
|
||||
let json = match result {
|
||||
Ok(r) => serde_json::to_value(r).unwrap(),
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = match e.downcast_ref::<tokio_postgres::Error>() {
|
||||
Some(e) => match e.code() {
|
||||
Some(e) => serde_json::to_value(e.code()).unwrap(),
|
||||
None => Value::Null
|
||||
},
|
||||
None => Value::Null
|
||||
};
|
||||
json!({ "message": message, "code": code })
|
||||
}
|
||||
};
|
||||
json_response(status_code, json).map(|mut r| {
|
||||
r.headers_mut().insert(
|
||||
"Access-Control-Allow-Origin",
|
||||
hyper::http::HeaderValue::from_static("*"),
|
||||
);
|
||||
r
|
||||
})
|
||||
|
||||
} else if request.uri().path() == "/sleep" {
|
||||
// sleep 15ms
|
||||
tokio::time::sleep(std::time::Duration::from_millis(15)).await;
|
||||
json_response(StatusCode::OK, "done")
|
||||
|
||||
} else {
|
||||
json_response(StatusCode::OK, "Connect with a websocket client")
|
||||
json_response(StatusCode::BAD_REQUEST, "query is not supported")
|
||||
}
|
||||
}
|
||||
|
||||
fn boxed_from_json(json: Vec<Value>) -> Result<Vec<Box<dyn ToSql + Sync + Send>>, anyhow::Error> {
|
||||
json.iter().map(|value| {
|
||||
let boxed: Result<Box<dyn ToSql + Sync + Send>, anyhow::Error> = match value {
|
||||
Value::Bool(b) => Ok(Box::new(b.clone())),
|
||||
Value::Number(n) => Ok(Box::new(n.as_f64().unwrap())),
|
||||
Value::String(s) => Ok(Box::new(s.clone())),
|
||||
// TODO: support null (not like this: `Value::Null => Ok(Box::new(None::<bool>)),`)
|
||||
// TODO: support arrays
|
||||
x => Err(anyhow::anyhow!("unsupported param {:?}", x))
|
||||
};
|
||||
boxed
|
||||
})
|
||||
.collect::<Result<Vec<_>, anyhow::Error>>()
|
||||
}
|
||||
|
||||
// XXX: return different error codes
|
||||
async fn handle_sql(
|
||||
config: &'static ProxyConfig,
|
||||
request: Request<Body>
|
||||
) -> anyhow::Result<Vec<Value>> {
|
||||
|
||||
let headers = request.headers();
|
||||
|
||||
let connection_string = headers
|
||||
.get("X-Neon-ConnectionString")
|
||||
.ok_or(anyhow::anyhow!("missing connection string"))?
|
||||
.to_str()?;
|
||||
|
||||
let connection_url = Url::parse(connection_string)?;
|
||||
|
||||
let protocol = connection_url.scheme();
|
||||
if protocol != "postgres" && protocol != "postgresql" {
|
||||
return Err(anyhow::anyhow!("connection string must start with postgres: or postgresql:"))
|
||||
}
|
||||
|
||||
let mut url_path = connection_url
|
||||
.path_segments()
|
||||
.ok_or(anyhow::anyhow!("missing database name"))?;
|
||||
|
||||
let dbname = url_path
|
||||
.next()
|
||||
.ok_or(anyhow::anyhow!("invalid database name"))?;
|
||||
|
||||
let username = connection_url.username();
|
||||
|
||||
let password = connection_url
|
||||
.password()
|
||||
.ok_or(anyhow::anyhow!("no password"))?;
|
||||
|
||||
let hostname = connection_url
|
||||
.host_str()
|
||||
.ok_or(anyhow::anyhow!("no host"))?;
|
||||
|
||||
let host_header = request
|
||||
.headers()
|
||||
.get("host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|h| h.split(':').next());
|
||||
|
||||
match host_header {
|
||||
Some(h) if h == hostname => h,
|
||||
Some(_) => return Err(anyhow::anyhow!("mismatched host header and hostname")),
|
||||
None => return Err(anyhow::anyhow!("no host header"))
|
||||
};
|
||||
|
||||
let mut body = request.into_body();
|
||||
let mut data = Vec::with_capacity(512);
|
||||
while let Some(chunk) = body.next().await {
|
||||
data.extend(&chunk?);
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct QueryData {
|
||||
query: String,
|
||||
params: Vec<serde_json::Value>
|
||||
}
|
||||
let query_data: QueryData = serde_json::from_slice(&data)?;
|
||||
|
||||
let credential_params = StartupMessageParams::new([
|
||||
("user", username),
|
||||
("database", dbname),
|
||||
("application_name", "proxy_http_sql"),
|
||||
]);
|
||||
let tls = config.tls_config.as_ref();
|
||||
let common_names = tls.and_then(|tls| tls.common_names.clone());
|
||||
let creds = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(hostname), common_names))
|
||||
.transpose()?;
|
||||
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id: uuid::Uuid::new_v4(),
|
||||
application_name: Some("proxy_http_sql"),
|
||||
};
|
||||
|
||||
let node = creds.wake_compute(&extra).await?.expect("msg");
|
||||
let conf = node.value.config;
|
||||
|
||||
let host = match conf.get_hosts().first().expect("no host") {
|
||||
tokio_postgres::config::Host::Tcp(host) => host,
|
||||
tokio_postgres::config::Host::Unix(_) => {
|
||||
return Err(anyhow::anyhow!("unix socket is not supported"));
|
||||
}
|
||||
};
|
||||
|
||||
let conn_string = &format!(
|
||||
"host={} port={} user={} password={} dbname={}",
|
||||
host,
|
||||
conf.get_ports().first().expect("no port"),
|
||||
username,
|
||||
password,
|
||||
dbname
|
||||
);
|
||||
|
||||
let (client, connection) = tokio_postgres::connect(conn_string, tokio_postgres::NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
let query = &query_data.query;
|
||||
let query_params = boxed_from_json(query_data.params)?;
|
||||
|
||||
// TODO: find a way to catch the panic and return an error if the number of params is wrong
|
||||
let pg_rows: Vec<Row> = client
|
||||
.query_raw(query, query_params)
|
||||
.await?
|
||||
.try_collect::<Vec<Row>>()
|
||||
.await?;
|
||||
|
||||
let rows: Result<Vec<serde_json::Value>, anyhow::Error> = pg_rows
|
||||
.iter()
|
||||
.map(postgres_row_to_json_value)
|
||||
.collect();
|
||||
|
||||
let rows = rows?;
|
||||
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
pub struct ConnectionCache {
|
||||
connections: HashMap<String, tokio_postgres::Client>,
|
||||
}
|
||||
|
||||
|
||||
impl ConnectionCache {
|
||||
pub fn new() -> Arc<Mutex<Self>> {
|
||||
Arc::new(Mutex::new(Self {
|
||||
connections: HashMap::new(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn execute(
|
||||
cache: &Arc<Mutex<ConnectionCache>>,
|
||||
conn_string: &str,
|
||||
hostname: &str,
|
||||
sql: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: let go mutex when establishing connection
|
||||
let mut cache = cache.lock().await;
|
||||
let cache_key = format!("connstr={}, hostname={}", conn_string, hostname);
|
||||
let client = if let Some(client) = cache.connections.get(&cache_key) {
|
||||
info!("using cached connection {}", conn_string);
|
||||
client
|
||||
} else {
|
||||
info!("!!!! connecting to: {}", conn_string);
|
||||
|
||||
let (client, connection) =
|
||||
tokio_postgres::connect(conn_string, tokio_postgres::NoTls).await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
// TODO: remove connection from cache
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
cache.connections.insert(cache_key.clone(), client);
|
||||
cache.connections.get(&cache_key).unwrap()
|
||||
};
|
||||
|
||||
let sql = percent_decode(sql.as_bytes()).decode_utf8()?.to_string();
|
||||
|
||||
let rows: Vec<HashMap<_, _>> = client
|
||||
.simple_query(&sql)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(|el| {
|
||||
if let tokio_postgres::SimpleQueryMessage::Row(row) = el {
|
||||
let mut serialized_row: HashMap<String, String> = HashMap::new();
|
||||
for i in 0..row.len() {
|
||||
let col = row.columns().get(i).map_or("?", |c| c.name());
|
||||
let val = row.get(i).unwrap_or("?");
|
||||
serialized_row.insert(col.into(), val.into());
|
||||
}
|
||||
Some(serialized_row)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(serde_json::to_string(&rows)?)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
cache: &'static Arc<Mutex<ConnectionCache>>,
|
||||
ws_listener: TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("websocket server has shut down");
|
||||
@@ -219,7 +595,7 @@ pub async fn task_main(
|
||||
move |req: Request<Body>| async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
ws_handler(req, config, cancel_map, session_id)
|
||||
ws_handler(req, config, cancel_map, session_id, cache.clone())
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = format_args!("{session_id}")
|
||||
@@ -231,6 +607,7 @@ pub async fn task_main(
|
||||
|
||||
hyper::Server::builder(accept::from_stream(tls_listener))
|
||||
.serve(make_svc)
|
||||
.with_graceful_shutdown(cancellation_token.cancelled())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -28,9 +28,12 @@ use config::ProxyConfig;
|
||||
use futures::FutureExt;
|
||||
use std::{borrow::Cow, future::Future, net::SocketAddr};
|
||||
use tokio::{net::TcpListener, task::JoinError};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
use utils::{project_git_version, sentry_init::init_sentry};
|
||||
|
||||
use crate::http::websocket::ConnectionCache;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
/// Flattens `Result<Result<T>>` into `Result<T>`.
|
||||
@@ -52,6 +55,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
let args = cli().get_matches();
|
||||
let config = build_config(&args)?;
|
||||
|
||||
let wsconn_cache = Box::leak(Box::new(ConnectionCache::new()));
|
||||
|
||||
info!("Authentication backend: {}", config.auth_backend);
|
||||
|
||||
// Check that we can bind to address before further initialization
|
||||
@@ -66,39 +71,49 @@ async fn main() -> anyhow::Result<()> {
|
||||
let proxy_address: SocketAddr = args.get_one::<String>("proxy").unwrap().parse()?;
|
||||
info!("Starting proxy on {proxy_address}");
|
||||
let proxy_listener = TcpListener::bind(proxy_address).await?;
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let mut tasks = vec![
|
||||
tokio::spawn(handle_signals()),
|
||||
tokio::spawn(http::server::task_main(http_listener)),
|
||||
tokio::spawn(proxy::task_main(config, proxy_listener)),
|
||||
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
|
||||
];
|
||||
let mut client_tasks = vec![tokio::spawn(proxy::task_main(
|
||||
config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
))];
|
||||
|
||||
if let Some(wss_address) = args.get_one::<String>("wss") {
|
||||
let wss_address: SocketAddr = wss_address.parse()?;
|
||||
info!("Starting wss on {wss_address}");
|
||||
let wss_listener = TcpListener::bind(wss_address).await?;
|
||||
|
||||
tasks.push(tokio::spawn(http::websocket::task_main(
|
||||
client_tasks.push(tokio::spawn(http::websocket::task_main(
|
||||
config,
|
||||
wsconn_cache,
|
||||
wss_listener,
|
||||
cancellation_token.clone(),
|
||||
)));
|
||||
}
|
||||
|
||||
let mut tasks = vec![
|
||||
tokio::spawn(handle_signals(cancellation_token)),
|
||||
tokio::spawn(http::server::task_main(http_listener)),
|
||||
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
|
||||
];
|
||||
|
||||
if let Some(metrics_config) = &config.metric_collection {
|
||||
tasks.push(tokio::spawn(metrics::task_main(metrics_config)));
|
||||
}
|
||||
|
||||
// This combinator will block until either all tasks complete or
|
||||
// one of them finishes with an error (others will be cancelled).
|
||||
let tasks = tasks.into_iter().map(flatten_err);
|
||||
let _: Vec<()> = futures::future::try_join_all(tasks).await?;
|
||||
|
||||
let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err));
|
||||
let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err));
|
||||
tokio::select! {
|
||||
// We are only expecting an error from these forever tasks
|
||||
res = tasks => { res?; },
|
||||
res = client_tasks => { res?; },
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle unix signals appropriately.
|
||||
async fn handle_signals() -> anyhow::Result<()> {
|
||||
async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
let mut hangup = signal(SignalKind::hangup())?;
|
||||
@@ -116,11 +131,9 @@ async fn handle_signals() -> anyhow::Result<()> {
|
||||
warn!("received SIGINT, exiting immediately");
|
||||
bail!("interrupted");
|
||||
}
|
||||
// TODO: Don't accept new proxy connections.
|
||||
// TODO: Shut down once all exisiting connections have been closed.
|
||||
_ = terminate.recv() => {
|
||||
warn!("received SIGTERM, exiting immediately");
|
||||
bail!("terminated");
|
||||
warn!("received SIGTERM, shutting down once all existing connections have closed");
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ use once_cell::sync::Lazy;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
@@ -63,6 +64,7 @@ static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
@@ -72,29 +74,48 @@ pub async fn task_main(
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let mut connections = tokio::task::JoinSet::new();
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
|
||||
loop {
|
||||
let (socket, peer_addr) = listener.accept().await?;
|
||||
info!("accepted postgres client connection from {peer_addr}");
|
||||
tokio::select! {
|
||||
accept_result = listener.accept() => {
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
info!("accepted postgres client connection from {peer_addr}");
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancel_map = Arc::clone(&cancel_map);
|
||||
tokio::spawn(
|
||||
async move {
|
||||
info!("spawned a task for {peer_addr}");
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancel_map = Arc::clone(&cancel_map);
|
||||
connections.spawn(
|
||||
async move {
|
||||
info!("spawned a task for {peer_addr}");
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
.context("failed to set socket option")?;
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
.context("failed to set socket option")?;
|
||||
|
||||
handle_client(config, &cancel_map, session_id, socket).await
|
||||
handle_client(config, &cancel_map, session_id, socket).await
|
||||
}
|
||||
.unwrap_or_else(|e| {
|
||||
// Acknowledge that the task has finished with an error.
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
}),
|
||||
);
|
||||
}
|
||||
.unwrap_or_else(|e| {
|
||||
// Acknowledge that the task has finished with an error.
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
}),
|
||||
);
|
||||
_ = cancellation_token.cancelled() => {
|
||||
drop(listener);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Drain connections
|
||||
while let Some(res) = connections.join_next().await {
|
||||
if let Err(e) = res {
|
||||
if !e.is_panic() && !e.is_cancelled() {
|
||||
warn!("unexpected error from joined connection task: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO(tech debt): unite this with its twin below.
|
||||
|
||||
@@ -23,7 +23,6 @@ use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
@@ -374,7 +373,7 @@ impl BrokerService for Broker {
|
||||
Ok(info) => yield info,
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
missed_msgs += skipped_msg;
|
||||
if let Poll::Ready(_) = futures::poll!(Box::pin(warn_interval.tick())) {
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
missed_msgs = 0;
|
||||
|
||||
@@ -114,7 +114,7 @@ class NeonCompare(PgCompare):
|
||||
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
|
||||
|
||||
# Start pg
|
||||
self._pg = self.env.postgres.create_start(branch_name, "main", self.tenant)
|
||||
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
|
||||
|
||||
@property
|
||||
def pg(self) -> PgProtocol:
|
||||
|
||||
@@ -830,7 +830,7 @@ class NeonEnvBuilder:
|
||||
# Stop all the nodes.
|
||||
if self.env:
|
||||
log.info("Cleaning up all storage and compute nodes")
|
||||
self.env.postgres.stop_all()
|
||||
self.env.endpoints.stop_all()
|
||||
for sk in self.env.safekeepers:
|
||||
sk.stop(immediate=True)
|
||||
self.env.pageserver.stop(immediate=True)
|
||||
@@ -894,7 +894,7 @@ class NeonEnv:
|
||||
self.port_distributor = config.port_distributor
|
||||
self.s3_mock_server = config.mock_s3_server
|
||||
self.neon_cli = NeonCli(env=self)
|
||||
self.postgres = PostgresFactory(self)
|
||||
self.endpoints = EndpointFactory(self)
|
||||
self.safekeepers: List[Safekeeper] = []
|
||||
self.broker = config.broker
|
||||
self.remote_storage = config.remote_storage
|
||||
@@ -902,6 +902,7 @@ class NeonEnv:
|
||||
self.pg_version = config.pg_version
|
||||
self.neon_binpath = config.neon_binpath
|
||||
self.pg_distrib_dir = config.pg_distrib_dir
|
||||
self.endpoint_counter = 0
|
||||
|
||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||
# so that we don't need to dig it out of the config file afterwards.
|
||||
@@ -1015,6 +1016,13 @@ class NeonEnv:
|
||||
priv = (Path(self.repo_dir) / "auth_private_key.pem").read_text()
|
||||
return AuthKeys(pub=pub, priv=priv)
|
||||
|
||||
def generate_endpoint_id(self) -> str:
|
||||
"""
|
||||
Generate a unique endpoint ID
|
||||
"""
|
||||
self.endpoint_counter += 1
|
||||
return "ep-" + str(self.endpoint_counter)
|
||||
|
||||
|
||||
@pytest.fixture(scope=shareable_scope)
|
||||
def _shared_simple_env(
|
||||
@@ -1073,7 +1081,7 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
yield _shared_simple_env
|
||||
|
||||
_shared_simple_env.postgres.stop_all()
|
||||
_shared_simple_env.endpoints.stop_all()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -1097,7 +1105,7 @@ def neon_env_builder(
|
||||
neon_env_builder.init_start().
|
||||
|
||||
After the initialization, you can launch compute nodes by calling
|
||||
the functions in the 'env.postgres' factory object, stop/start the
|
||||
the functions in the 'env.endpoints' factory object, stop/start the
|
||||
nodes, etc.
|
||||
"""
|
||||
|
||||
@@ -1438,16 +1446,16 @@ class NeonCli(AbstractNeonCli):
|
||||
args.extend(["-m", "immediate"])
|
||||
return self.raw_cli(args)
|
||||
|
||||
def pg_create(
|
||||
def endpoint_create(
|
||||
self,
|
||||
branch_name: str,
|
||||
node_name: Optional[str] = None,
|
||||
endpoint_id: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
port: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"pg",
|
||||
"endpoint",
|
||||
"create",
|
||||
"--tenant-id",
|
||||
str(tenant_id or self.env.initial_tenant),
|
||||
@@ -1460,22 +1468,22 @@ class NeonCli(AbstractNeonCli):
|
||||
args.extend(["--lsn", str(lsn)])
|
||||
if port is not None:
|
||||
args.extend(["--port", str(port)])
|
||||
if node_name is not None:
|
||||
args.append(node_name)
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def pg_start(
|
||||
def endpoint_start(
|
||||
self,
|
||||
node_name: str,
|
||||
endpoint_id: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
port: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"pg",
|
||||
"endpoint",
|
||||
"start",
|
||||
"--tenant-id",
|
||||
str(tenant_id or self.env.initial_tenant),
|
||||
@@ -1486,30 +1494,30 @@ class NeonCli(AbstractNeonCli):
|
||||
args.append(f"--lsn={lsn}")
|
||||
if port is not None:
|
||||
args.append(f"--port={port}")
|
||||
if node_name is not None:
|
||||
args.append(node_name)
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def pg_stop(
|
||||
def endpoint_stop(
|
||||
self,
|
||||
node_name: str,
|
||||
endpoint_id: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
destroy=False,
|
||||
check_return_code=True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"pg",
|
||||
"endpoint",
|
||||
"stop",
|
||||
"--tenant-id",
|
||||
str(tenant_id or self.env.initial_tenant),
|
||||
]
|
||||
if destroy:
|
||||
args.append("--destroy")
|
||||
if node_name is not None:
|
||||
args.append(node_name)
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
@@ -1905,15 +1913,26 @@ def remote_pg(
|
||||
connstr = os.getenv("BENCHMARK_CONNSTR")
|
||||
if connstr is None:
|
||||
raise ValueError("no connstr provided, use BENCHMARK_CONNSTR environment variable")
|
||||
|
||||
host = parse_dsn(connstr).get("host", "")
|
||||
is_neon = host.endswith(".neon.build")
|
||||
|
||||
start_ms = int(datetime.utcnow().timestamp() * 1000)
|
||||
with RemotePostgres(pg_bin, connstr) as remote_pg:
|
||||
if is_neon:
|
||||
timeline_id = TimelineId(remote_pg.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
|
||||
yield remote_pg
|
||||
|
||||
end_ms = int(datetime.utcnow().timestamp() * 1000)
|
||||
host = parse_dsn(connstr).get("host", "")
|
||||
if host.endswith(".neon.build"):
|
||||
if is_neon:
|
||||
# Add 10s margin to the start and end times
|
||||
allure_add_grafana_links(host, start_ms - 10_000, end_ms + 10_000)
|
||||
allure_add_grafana_links(
|
||||
host,
|
||||
timeline_id,
|
||||
start_ms - 10_000,
|
||||
end_ms + 10_000,
|
||||
)
|
||||
|
||||
|
||||
class PSQL:
|
||||
@@ -2033,6 +2052,17 @@ class NeonProxy(PgProtocol):
|
||||
self._wait_until_ready()
|
||||
return self
|
||||
|
||||
# Sends SIGTERM to the proxy if it has been started
|
||||
def terminate(self):
|
||||
if self._popen:
|
||||
self._popen.terminate()
|
||||
|
||||
# Waits for proxy to exit if it has been opened with a default timeout of
|
||||
# two seconds. Raises subprocess.TimeoutExpired if the proxy does not exit in time.
|
||||
def wait_for_exit(self, timeout=2):
|
||||
if self._popen:
|
||||
self._popen.wait(timeout=2)
|
||||
|
||||
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
|
||||
def _wait_until_ready(self):
|
||||
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
|
||||
@@ -2167,8 +2197,8 @@ def static_proxy(
|
||||
yield proxy
|
||||
|
||||
|
||||
class Postgres(PgProtocol):
|
||||
"""An object representing a running postgres daemon."""
|
||||
class Endpoint(PgProtocol):
|
||||
"""An object representing a Postgres compute endpoint managed by the control plane."""
|
||||
|
||||
def __init__(
|
||||
self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True
|
||||
@@ -2176,33 +2206,40 @@ class Postgres(PgProtocol):
|
||||
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.node_name: Optional[str] = None # dubious, see asserts below
|
||||
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
self.tenant_id = tenant_id
|
||||
self.port = port
|
||||
self.check_stop_result = check_stop_result
|
||||
# path to conf is <repo_dir>/pgdatadirs/tenants/<tenant_id>/<node_name>/postgresql.conf
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
def create(
|
||||
self,
|
||||
branch_name: str,
|
||||
node_name: Optional[str] = None,
|
||||
endpoint_id: Optional[str] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> "Postgres":
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create the pg data directory.
|
||||
Create a new Postgres endpoint.
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
if not config_lines:
|
||||
config_lines = []
|
||||
|
||||
self.node_name = node_name or f"{branch_name}_pg_node"
|
||||
self.env.neon_cli.pg_create(
|
||||
branch_name, node_name=self.node_name, tenant_id=self.tenant_id, lsn=lsn, port=self.port
|
||||
if endpoint_id is None:
|
||||
endpoint_id = self.env.generate_endpoint_id()
|
||||
self.endpoint_id = endpoint_id
|
||||
|
||||
self.env.neon_cli.endpoint_create(
|
||||
branch_name,
|
||||
endpoint_id=self.endpoint_id,
|
||||
tenant_id=self.tenant_id,
|
||||
lsn=lsn,
|
||||
port=self.port,
|
||||
)
|
||||
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
|
||||
|
||||
if config_lines is None:
|
||||
@@ -2215,26 +2252,30 @@ class Postgres(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def start(self) -> "Postgres":
|
||||
def start(self) -> "Endpoint":
|
||||
"""
|
||||
Start the Postgres instance.
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
assert self.node_name is not None
|
||||
assert self.endpoint_id is not None
|
||||
|
||||
log.info(f"Starting postgres node {self.node_name}")
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
self.env.neon_cli.pg_start(self.node_name, tenant_id=self.tenant_id, port=self.port)
|
||||
self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port)
|
||||
self.running = True
|
||||
|
||||
return self
|
||||
|
||||
def endpoint_path(self) -> Path:
|
||||
"""Path to endpoint directory"""
|
||||
assert self.endpoint_id
|
||||
path = Path("endpoints") / self.endpoint_id
|
||||
return self.env.repo_dir / path
|
||||
|
||||
def pg_data_dir_path(self) -> str:
|
||||
"""Path to data directory"""
|
||||
assert self.node_name
|
||||
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
|
||||
return os.path.join(self.env.repo_dir, path)
|
||||
"""Path to Postgres data directory"""
|
||||
return os.path.join(self.endpoint_path(), "pgdata")
|
||||
|
||||
def pg_xact_dir_path(self) -> str:
|
||||
"""Path to pg_xact dir"""
|
||||
@@ -2248,7 +2289,7 @@ class Postgres(PgProtocol):
|
||||
"""Path to postgresql.conf"""
|
||||
return os.path.join(self.pg_data_dir_path(), "postgresql.conf")
|
||||
|
||||
def adjust_for_safekeepers(self, safekeepers: str) -> "Postgres":
|
||||
def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint":
|
||||
"""
|
||||
Adjust instance config for working with wal acceptors instead of
|
||||
pageserver (pre-configured by CLI) directly.
|
||||
@@ -2272,7 +2313,7 @@ class Postgres(PgProtocol):
|
||||
f.write("neon.safekeepers = '{}'\n".format(safekeepers))
|
||||
return self
|
||||
|
||||
def config(self, lines: List[str]) -> "Postgres":
|
||||
def config(self, lines: List[str]) -> "Endpoint":
|
||||
"""
|
||||
Add lines to postgresql.conf.
|
||||
Lines should be an array of valid postgresql.conf rows.
|
||||
@@ -2286,32 +2327,32 @@ class Postgres(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def stop(self) -> "Postgres":
|
||||
def stop(self) -> "Endpoint":
|
||||
"""
|
||||
Stop the Postgres instance if it's running.
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
if self.running:
|
||||
assert self.node_name is not None
|
||||
self.env.neon_cli.pg_stop(
|
||||
self.node_name, self.tenant_id, check_return_code=self.check_stop_result
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, self.tenant_id, check_return_code=self.check_stop_result
|
||||
)
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
|
||||
def stop_and_destroy(self) -> "Postgres":
|
||||
def stop_and_destroy(self) -> "Endpoint":
|
||||
"""
|
||||
Stop the Postgres instance, then destroy it.
|
||||
Stop the Postgres instance, then destroy the endpoint.
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
assert self.node_name is not None
|
||||
self.env.neon_cli.pg_stop(
|
||||
self.node_name, self.tenant_id, True, check_return_code=self.check_stop_result
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, self.tenant_id, True, check_return_code=self.check_stop_result
|
||||
)
|
||||
self.node_name = None
|
||||
self.endpoint_id = None
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
@@ -2319,13 +2360,12 @@ class Postgres(PgProtocol):
|
||||
def create_start(
|
||||
self,
|
||||
branch_name: str,
|
||||
node_name: Optional[str] = None,
|
||||
endpoint_id: Optional[str] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> "Postgres":
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create a Postgres instance, apply config
|
||||
and then start it.
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
Returns self.
|
||||
"""
|
||||
|
||||
@@ -2333,7 +2373,7 @@ class Postgres(PgProtocol):
|
||||
|
||||
self.create(
|
||||
branch_name=branch_name,
|
||||
node_name=node_name,
|
||||
endpoint_id=endpoint_id,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
).start()
|
||||
@@ -2342,7 +2382,7 @@ class Postgres(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def __enter__(self) -> "Postgres":
|
||||
def __enter__(self) -> "Endpoint":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
@@ -2354,33 +2394,33 @@ class Postgres(PgProtocol):
|
||||
self.stop()
|
||||
|
||||
|
||||
class PostgresFactory:
|
||||
"""An object representing multiple running postgres daemons."""
|
||||
class EndpointFactory:
|
||||
"""An object representing multiple compute endpoints."""
|
||||
|
||||
def __init__(self, env: NeonEnv):
|
||||
self.env = env
|
||||
self.num_instances: int = 0
|
||||
self.instances: List[Postgres] = []
|
||||
self.endpoints: List[Endpoint] = []
|
||||
|
||||
def create_start(
|
||||
self,
|
||||
branch_name: str,
|
||||
node_name: Optional[str] = None,
|
||||
endpoint_id: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> Postgres:
|
||||
pg = Postgres(
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
tenant_id=tenant_id or self.env.initial_tenant,
|
||||
port=self.env.port_distributor.get_port(),
|
||||
)
|
||||
self.num_instances += 1
|
||||
self.instances.append(pg)
|
||||
self.endpoints.append(ep)
|
||||
|
||||
return pg.create_start(
|
||||
return ep.create_start(
|
||||
branch_name=branch_name,
|
||||
node_name=node_name,
|
||||
endpoint_id=endpoint_id,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
)
|
||||
@@ -2388,30 +2428,33 @@ class PostgresFactory:
|
||||
def create(
|
||||
self,
|
||||
branch_name: str,
|
||||
node_name: Optional[str] = None,
|
||||
endpoint_id: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> Postgres:
|
||||
pg = Postgres(
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
tenant_id=tenant_id or self.env.initial_tenant,
|
||||
port=self.env.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
self.num_instances += 1
|
||||
self.instances.append(pg)
|
||||
if endpoint_id is None:
|
||||
endpoint_id = self.env.generate_endpoint_id()
|
||||
|
||||
return pg.create(
|
||||
self.num_instances += 1
|
||||
self.endpoints.append(ep)
|
||||
|
||||
return ep.create(
|
||||
branch_name=branch_name,
|
||||
node_name=node_name,
|
||||
endpoint_id=endpoint_id,
|
||||
lsn=lsn,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
def stop_all(self) -> "PostgresFactory":
|
||||
for pg in self.instances:
|
||||
pg.stop()
|
||||
def stop_all(self) -> "EndpointFactory":
|
||||
for ep in self.endpoints:
|
||||
ep.stop()
|
||||
|
||||
return self
|
||||
|
||||
@@ -2786,16 +2829,16 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
|
||||
def check_restored_datadir_content(
|
||||
test_output_dir: Path,
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
endpoint: Endpoint,
|
||||
):
|
||||
# Get the timeline ID. We need it for the 'basebackup' command
|
||||
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
timeline = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
|
||||
# stop postgres to ensure that files won't change
|
||||
pg.stop()
|
||||
endpoint.stop()
|
||||
|
||||
# Take a basebackup from pageserver
|
||||
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
|
||||
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
|
||||
restored_dir_path.mkdir(exist_ok=True)
|
||||
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
@@ -2805,7 +2848,7 @@ def check_restored_datadir_content(
|
||||
{psql_path} \
|
||||
--no-psqlrc \
|
||||
postgres://localhost:{env.pageserver.service_port.pg} \
|
||||
-c 'basebackup {pg.tenant_id} {timeline}' \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline}' \
|
||||
| tar -x -C {restored_dir_path}
|
||||
"""
|
||||
|
||||
@@ -2822,8 +2865,8 @@ def check_restored_datadir_content(
|
||||
assert result.returncode == 0
|
||||
|
||||
# list files we're going to compare
|
||||
assert pg.pgdata_dir
|
||||
pgdata_files = list_files_to_compare(Path(pg.pgdata_dir))
|
||||
assert endpoint.pgdata_dir
|
||||
pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir))
|
||||
restored_files = list_files_to_compare(restored_dir_path)
|
||||
|
||||
# check that file sets are equal
|
||||
@@ -2834,12 +2877,12 @@ def check_restored_datadir_content(
|
||||
# We've already filtered all mismatching files in list_files_to_compare(),
|
||||
# so here expect that the content is identical
|
||||
(match, mismatch, error) = filecmp.cmpfiles(
|
||||
pg.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
|
||||
endpoint.pgdata_dir, restored_dir_path, pgdata_files, shallow=False
|
||||
)
|
||||
log.info(f"filecmp result mismatch and error lists:\n\t mismatch={mismatch}\n\t error={error}")
|
||||
|
||||
for f in mismatch:
|
||||
f1 = os.path.join(pg.pgdata_dir, f)
|
||||
f1 = os.path.join(endpoint.pgdata_dir, f)
|
||||
f2 = os.path.join(restored_dir_path, f)
|
||||
stdout_filename = "{}.filediff".format(f2)
|
||||
|
||||
@@ -2854,24 +2897,24 @@ def check_restored_datadir_content(
|
||||
|
||||
|
||||
def wait_for_last_flush_lsn(
|
||||
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
|
||||
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def wait_for_wal_insert_lsn(
|
||||
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
|
||||
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def fork_at_current_lsn(
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
endpoint: Endpoint,
|
||||
new_branch_name: str,
|
||||
ancestor_branch_name: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
@@ -2881,7 +2924,7 @@ def fork_at_current_lsn(
|
||||
The "last LSN" is taken from the given Postgres instance. The pageserver will wait for all the
|
||||
the WAL up to that LSN to arrive in the pageserver before creating the branch.
|
||||
"""
|
||||
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
|
||||
current_lsn = endpoint.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
|
||||
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
|
||||
|
||||
|
||||
|
||||
@@ -519,6 +519,13 @@ class PageserverHttpClient(requests.Session):
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
def download_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
info = self.layer_map_info(tenant_id, timeline_id)
|
||||
for layer in info.historic_layers:
|
||||
if not layer.remote:
|
||||
continue
|
||||
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
|
||||
|
||||
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
|
||||
|
||||
@@ -1,16 +1,20 @@
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
def assert_tenant_status(
|
||||
pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str
|
||||
def assert_tenant_state(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant: TenantId,
|
||||
expected_state: str,
|
||||
message: Optional[str] = None,
|
||||
):
|
||||
tenant_status = pageserver_http.tenant_status(tenant)
|
||||
log.info(f"tenant_status: {tenant_status}")
|
||||
assert tenant_status["state"] == expected_status, tenant_status
|
||||
assert tenant_status["state"]["slug"] == expected_state, message or tenant_status
|
||||
|
||||
|
||||
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
|
||||
@@ -68,6 +72,7 @@ def wait_until_tenant_state(
|
||||
tenant_id: TenantId,
|
||||
expected_state: str,
|
||||
iterations: int,
|
||||
period: float = 1.0,
|
||||
) -> bool:
|
||||
"""
|
||||
Does not use `wait_until` for debugging purposes
|
||||
@@ -76,21 +81,28 @@ def wait_until_tenant_state(
|
||||
try:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
log.debug(f"Tenant {tenant_id} data: {tenant}")
|
||||
if tenant["state"] == expected_state:
|
||||
if tenant["state"]["slug"] == expected_state:
|
||||
return True
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
|
||||
time.sleep(1)
|
||||
time.sleep(period)
|
||||
|
||||
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
|
||||
|
||||
|
||||
def wait_until_tenant_active(
|
||||
pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
iterations: int = 30,
|
||||
period: float = 1.0,
|
||||
):
|
||||
wait_until_tenant_state(
|
||||
pageserver_http, tenant_id, expected_state="Active", iterations=iterations
|
||||
pageserver_http,
|
||||
tenant_id,
|
||||
expected_state="Active",
|
||||
iterations=iterations,
|
||||
period=period,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import allure
|
||||
from psycopg2.extensions import cursor
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.types import TimelineId
|
||||
|
||||
Fn = TypeVar("Fn", bound=Callable[..., Any])
|
||||
|
||||
@@ -186,11 +187,15 @@ def allure_attach_from_dir(dir: Path):
|
||||
allure.attach.file(source, name, attachment_type, extension)
|
||||
|
||||
|
||||
DATASOURCE_ID = "xHHYY0dVz"
|
||||
GRAFANA_URL = "https://neonprod.grafana.net"
|
||||
GRAFANA_EXPLORE_URL = f"{GRAFANA_URL}/explore"
|
||||
GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL = f"{GRAFANA_URL}/d/8G011dlnk/timeline-inspector"
|
||||
LOGS_STAGING_DATASOURCE_ID = "xHHYY0dVz"
|
||||
|
||||
|
||||
def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
|
||||
def allure_add_grafana_links(host: str, timeline_id: TimelineId, start_ms: int, end_ms: int):
|
||||
"""Add links to server logs in Grafana to Allure report"""
|
||||
links = {}
|
||||
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
|
||||
endpoint_id, region_id, _ = host.split(".", 2)
|
||||
|
||||
@@ -202,12 +207,12 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
|
||||
}
|
||||
|
||||
params: Dict[str, Any] = {
|
||||
"datasource": DATASOURCE_ID,
|
||||
"datasource": LOGS_STAGING_DATASOURCE_ID,
|
||||
"queries": [
|
||||
{
|
||||
"expr": "<PUT AN EXPRESSION HERE>",
|
||||
"refId": "A",
|
||||
"datasource": {"type": "loki", "uid": DATASOURCE_ID},
|
||||
"datasource": {"type": "loki", "uid": LOGS_STAGING_DATASOURCE_ID},
|
||||
"editorMode": "code",
|
||||
"queryType": "range",
|
||||
}
|
||||
@@ -220,8 +225,23 @@ def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
|
||||
for name, expr in expressions.items():
|
||||
params["queries"][0]["expr"] = expr
|
||||
query_string = urlencode({"orgId": 1, "left": json.dumps(params)})
|
||||
link = f"https://neonprod.grafana.net/explore?{query_string}"
|
||||
links[name] = f"{GRAFANA_EXPLORE_URL}?{query_string}"
|
||||
|
||||
timeline_qs = urlencode(
|
||||
{
|
||||
"orgId": 1,
|
||||
"var-environment": "victoria-metrics-aws-dev",
|
||||
"var-timeline_id": timeline_id,
|
||||
"var-endpoint_id": endpoint_id,
|
||||
"var-log_datasource": "grafanacloud-neonstaging-logs",
|
||||
"from": start_ms,
|
||||
"to": end_ms,
|
||||
}
|
||||
)
|
||||
link = f"{GRAFANA_TIMELINE_INSPECTOR_DASHBOARD_URL}?{timeline_qs}"
|
||||
links["Timeline Inspector"] = link
|
||||
|
||||
for name, link in links.items():
|
||||
allure.dynamic.link(link, name=name)
|
||||
log.info(f"{name}: {link}")
|
||||
|
||||
|
||||
@@ -52,13 +52,13 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
|
||||
def run_pgbench(branch: str):
|
||||
log.info(f"Start a pgbench workload on branch {branch}")
|
||||
|
||||
pg = env.postgres.create_start(branch, tenant_id=tenant)
|
||||
connstr = pg.connstr()
|
||||
endpoint = env.endpoints.create_start(branch, tenant_id=tenant)
|
||||
connstr = endpoint.connstr()
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
|
||||
|
||||
pg.stop()
|
||||
endpoint.stop()
|
||||
|
||||
env.neon_cli.create_branch("b0", tenant_id=tenant)
|
||||
|
||||
@@ -96,8 +96,8 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int):
|
||||
|
||||
env.neon_cli.create_branch("b0")
|
||||
|
||||
pg = env.postgres.create_start("b0")
|
||||
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", pg.connstr()])
|
||||
endpoint = env.endpoints.create_start("b0")
|
||||
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
|
||||
|
||||
branch_creation_durations = []
|
||||
|
||||
@@ -124,15 +124,15 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
|
||||
|
||||
timeline_id = env.neon_cli.create_branch("root")
|
||||
|
||||
pg = env.postgres.create_start("root")
|
||||
with closing(pg.connect()) as conn:
|
||||
endpoint = env.endpoints.create_start("root")
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
for i in range(10000):
|
||||
cur.execute(f"CREATE TABLE t{i} as SELECT g FROM generate_series(1, 1000) g")
|
||||
|
||||
# Wait for the pageserver to finish processing all the pending WALs,
|
||||
# as we don't want the LSN wait time to be included during the branch creation
|
||||
flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
wait_for_last_record_lsn(
|
||||
env.pageserver.http_client(), env.initial_tenant, timeline_id, flush_lsn
|
||||
)
|
||||
@@ -142,7 +142,7 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
|
||||
|
||||
# run a concurrent insertion to make the ancestor "busy" during the branch creation
|
||||
thread = threading.Thread(
|
||||
target=pg.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
|
||||
target=endpoint.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
|
||||
)
|
||||
thread.start()
|
||||
|
||||
|
||||
@@ -42,41 +42,41 @@ def test_compare_child_and_root_pgbench_perf(neon_compare: NeonCompare):
|
||||
neon_compare.zenbenchmark.record_pg_bench_result(branch, res)
|
||||
|
||||
env.neon_cli.create_branch("root")
|
||||
pg_root = env.postgres.create_start("root")
|
||||
pg_bin.run_capture(["pgbench", "-i", pg_root.connstr(), "-s10"])
|
||||
endpoint_root = env.endpoints.create_start("root")
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint_root.connstr(), "-s10"])
|
||||
|
||||
fork_at_current_lsn(env, pg_root, "child", "root")
|
||||
fork_at_current_lsn(env, endpoint_root, "child", "root")
|
||||
|
||||
pg_child = env.postgres.create_start("child")
|
||||
endpoint_child = env.endpoints.create_start("child")
|
||||
|
||||
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", pg_root.connstr()])
|
||||
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", pg_child.connstr()])
|
||||
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", endpoint_root.connstr()])
|
||||
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", endpoint_child.connstr()])
|
||||
|
||||
|
||||
def test_compare_child_and_root_write_perf(neon_compare: NeonCompare):
|
||||
env = neon_compare.env
|
||||
env.neon_cli.create_branch("root")
|
||||
pg_root = env.postgres.create_start("root")
|
||||
endpoint_root = env.endpoints.create_start("root")
|
||||
|
||||
pg_root.safe_psql(
|
||||
endpoint_root.safe_psql(
|
||||
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
|
||||
)
|
||||
|
||||
env.neon_cli.create_branch("child", "root")
|
||||
pg_child = env.postgres.create_start("child")
|
||||
endpoint_child = env.endpoints.create_start("child")
|
||||
|
||||
with neon_compare.record_duration("root_run_duration"):
|
||||
pg_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
|
||||
endpoint_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
|
||||
with neon_compare.record_duration("child_run_duration"):
|
||||
pg_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
|
||||
endpoint_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
|
||||
|
||||
|
||||
def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
|
||||
env = neon_compare.env
|
||||
env.neon_cli.create_branch("root")
|
||||
pg_root = env.postgres.create_start("root")
|
||||
endpoint_root = env.endpoints.create_start("root")
|
||||
|
||||
pg_root.safe_psql_many(
|
||||
endpoint_root.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
|
||||
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
|
||||
@@ -84,12 +84,12 @@ def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
|
||||
)
|
||||
|
||||
env.neon_cli.create_branch("child", "root")
|
||||
pg_child = env.postgres.create_start("child")
|
||||
endpoint_child = env.endpoints.create_start("child")
|
||||
|
||||
with neon_compare.record_duration("root_run_duration"):
|
||||
pg_root.safe_psql("SELECT count(*) from foo")
|
||||
endpoint_root.safe_psql("SELECT count(*) from foo")
|
||||
with neon_compare.record_duration("child_run_duration"):
|
||||
pg_child.safe_psql("SELECT count(*) from foo")
|
||||
endpoint_child.safe_psql("SELECT count(*) from foo")
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
@@ -35,14 +35,14 @@ def test_bulk_tenant_create(
|
||||
# if use_safekeepers == 'with_sa':
|
||||
# wa_factory.start_n_new(3)
|
||||
|
||||
pg_tenant = env.postgres.create_start(
|
||||
endpoint_tenant = env.endpoints.create_start(
|
||||
f"test_bulk_tenant_create_{tenants_count}_{i}", tenant_id=tenant
|
||||
)
|
||||
|
||||
end = timeit.default_timer()
|
||||
time_slices.append(end - start)
|
||||
|
||||
pg_tenant.stop()
|
||||
endpoint_tenant.stop()
|
||||
|
||||
zenbenchmark.record(
|
||||
"tenant_creation_time",
|
||||
|
||||
@@ -18,8 +18,8 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
|
||||
|
||||
timeline_id = env.neon_cli.create_branch("test_bulk_update")
|
||||
tenant_id = env.initial_tenant
|
||||
pg = env.postgres.create_start("test_bulk_update")
|
||||
cur = pg.connect().cursor()
|
||||
endpoint = env.endpoints.create_start("test_bulk_update")
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("set statement_timeout=0")
|
||||
|
||||
cur.execute(f"create table t(x integer) WITH (fillfactor={fillfactor})")
|
||||
@@ -28,13 +28,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
|
||||
cur.execute("vacuum t")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
with zenbenchmark.record_duration("update-no-prefetch"):
|
||||
cur.execute("update t set x=x+1")
|
||||
|
||||
cur.execute("vacuum t")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
with zenbenchmark.record_duration("delete-no-prefetch"):
|
||||
cur.execute("delete from t")
|
||||
@@ -50,13 +50,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
|
||||
cur.execute(f"insert into t2 values (generate_series(1,{n_records}))")
|
||||
|
||||
cur.execute("vacuum t2")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
with zenbenchmark.record_duration("update-with-prefetch"):
|
||||
cur.execute("update t2 set x=x+1")
|
||||
|
||||
cur.execute("vacuum t2")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
with zenbenchmark.record_duration("delete-with-prefetch"):
|
||||
cur.execute("delete from t2")
|
||||
|
||||
@@ -33,11 +33,11 @@ def test_compaction(neon_compare: NeonCompare):
|
||||
|
||||
# Create some tables, and run a bunch of INSERTs and UPDATes on them,
|
||||
# to generate WAL and layers
|
||||
pg = env.postgres.create_start(
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main", tenant_id=tenant_id, config_lines=["shared_buffers=512MB"]
|
||||
)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
for i in range(100):
|
||||
cur.execute(f"create table tbl{i} (i int, j int);")
|
||||
@@ -45,7 +45,7 @@ def test_compaction(neon_compare: NeonCompare):
|
||||
for j in range(100):
|
||||
cur.execute(f"update tbl{i} set j = {j};")
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# First compaction generates L1 layers
|
||||
with neon_compare.zenbenchmark.record_duration("compaction"):
|
||||
|
||||
@@ -2,13 +2,13 @@ import threading
|
||||
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
from fixtures.neon_fixtures import Postgres
|
||||
from fixtures.neon_fixtures import PgProtocol
|
||||
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from performance.test_wal_backpressure import record_read_latency
|
||||
|
||||
|
||||
def start_write_workload(pg: Postgres, scale: int = 10):
|
||||
def start_write_workload(pg: PgProtocol, scale: int = 10):
|
||||
with pg.connect().cursor() as cur:
|
||||
cur.execute(f"create table big as select generate_series(1,{scale*100_000})")
|
||||
|
||||
|
||||
@@ -25,8 +25,8 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
)
|
||||
|
||||
env.neon_cli.create_timeline("test_layer_map", tenant_id=tenant)
|
||||
pg = env.postgres.create_start("test_layer_map", tenant_id=tenant)
|
||||
cur = pg.connect().cursor()
|
||||
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("create table t(x integer)")
|
||||
for i in range(n_iters):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
|
||||
@@ -14,19 +14,19 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
|
||||
# Start
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
with zenbenchmark.record_duration("startup_time"):
|
||||
pg = env.postgres.create_start("test_startup")
|
||||
pg.safe_psql("select 1;")
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Restart
|
||||
pg.stop_and_destroy()
|
||||
endpoint.stop_and_destroy()
|
||||
with zenbenchmark.record_duration("restart_time"):
|
||||
pg.create_start("test_startup")
|
||||
pg.safe_psql("select 1;")
|
||||
endpoint.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Fill up
|
||||
num_rows = 1000000 # 30 MB
|
||||
num_tables = 100
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
for i in range(num_tables):
|
||||
cur.execute(f"create table t_{i} (i integer);")
|
||||
@@ -34,18 +34,18 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
|
||||
|
||||
# Read
|
||||
with zenbenchmark.record_duration("read_time"):
|
||||
pg.safe_psql("select * from t_0;")
|
||||
endpoint.safe_psql("select * from t_0;")
|
||||
|
||||
# Read again
|
||||
with zenbenchmark.record_duration("second_read_time"):
|
||||
pg.safe_psql("select * from t_0;")
|
||||
endpoint.safe_psql("select * from t_0;")
|
||||
|
||||
# Restart
|
||||
pg.stop_and_destroy()
|
||||
endpoint.stop_and_destroy()
|
||||
with zenbenchmark.record_duration("restart_with_data"):
|
||||
pg.create_start("test_startup")
|
||||
pg.safe_psql("select 1;")
|
||||
endpoint.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Read
|
||||
with zenbenchmark.record_duration("read_after_restart"):
|
||||
pg.safe_psql("select * from t_0;")
|
||||
endpoint.safe_psql("select * from t_0;")
|
||||
|
||||
@@ -22,8 +22,8 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
|
||||
|
||||
pg_branch0 = env.postgres.create_start("main", tenant_id=tenant)
|
||||
branch0_cur = pg_branch0.connect().cursor()
|
||||
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
branch0_cur = endpoint_branch0.connect().cursor()
|
||||
branch0_timeline = TimelineId(query_scalar(branch0_cur, "SHOW neon.timeline_id"))
|
||||
log.info(f"b0 timeline {branch0_timeline}")
|
||||
|
||||
@@ -44,10 +44,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Create branch1.
|
||||
env.neon_cli.create_branch("branch1", "main", tenant_id=tenant, ancestor_start_lsn=lsn_100)
|
||||
pg_branch1 = env.postgres.create_start("branch1", tenant_id=tenant)
|
||||
endpoint_branch1 = env.endpoints.create_start("branch1", tenant_id=tenant)
|
||||
log.info("postgres is running on 'branch1' branch")
|
||||
|
||||
branch1_cur = pg_branch1.connect().cursor()
|
||||
branch1_cur = endpoint_branch1.connect().cursor()
|
||||
branch1_timeline = TimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id"))
|
||||
log.info(f"b1 timeline {branch1_timeline}")
|
||||
|
||||
@@ -67,9 +67,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Create branch2.
|
||||
env.neon_cli.create_branch("branch2", "branch1", tenant_id=tenant, ancestor_start_lsn=lsn_200)
|
||||
pg_branch2 = env.postgres.create_start("branch2", tenant_id=tenant)
|
||||
endpoint_branch2 = env.endpoints.create_start("branch2", tenant_id=tenant)
|
||||
log.info("postgres is running on 'branch2' branch")
|
||||
branch2_cur = pg_branch2.connect().cursor()
|
||||
branch2_cur = endpoint_branch2.connect().cursor()
|
||||
|
||||
branch2_timeline = TimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id"))
|
||||
log.info(f"b2 timeline {branch2_timeline}")
|
||||
|
||||
@@ -64,9 +64,9 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
branch = "test_compute_auth_to_pageserver"
|
||||
env.neon_cli.create_branch(branch)
|
||||
pg = env.postgres.create_start(branch)
|
||||
endpoint = env.endpoints.create_start(branch)
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# we rely upon autocommit after each statement
|
||||
# as waiting for acceptors happens there
|
||||
@@ -83,7 +83,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
|
||||
branch = f"test_auth_failures_auth_enabled_{auth_enabled}"
|
||||
timeline_id = env.neon_cli.create_branch(branch)
|
||||
env.postgres.create_start(branch)
|
||||
env.endpoints.create_start(branch)
|
||||
|
||||
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
|
||||
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())
|
||||
|
||||
@@ -5,7 +5,7 @@ from contextlib import closing, contextmanager
|
||||
import psycopg2.extras
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, Postgres
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder
|
||||
|
||||
pytest_plugins = "fixtures.neon_fixtures"
|
||||
|
||||
@@ -20,10 +20,10 @@ def pg_cur(pg):
|
||||
# Periodically check that all backpressure lags are below the configured threshold,
|
||||
# assert if they are not.
|
||||
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
|
||||
def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5):
|
||||
def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_interval=5):
|
||||
log.info("checks started")
|
||||
|
||||
with pg_cur(pg) as cur:
|
||||
with pg_cur(endpoint) as cur:
|
||||
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
|
||||
|
||||
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
|
||||
@@ -41,7 +41,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
|
||||
max_replication_apply_lag_bytes = res[0]
|
||||
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
|
||||
|
||||
with pg_cur(pg) as cur:
|
||||
with pg_cur(endpoint) as cur:
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
cur.execute(
|
||||
@@ -102,14 +102,14 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
# Create a branch for us
|
||||
env.neon_cli.create_branch("test_backpressure")
|
||||
|
||||
pg = env.postgres.create_start(
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
|
||||
)
|
||||
log.info("postgres is running on 'test_backpressure' branch")
|
||||
|
||||
# setup check thread
|
||||
check_stop_event = threading.Event()
|
||||
check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event))
|
||||
check_thread = threading.Thread(target=check_backpressure, args=(endpoint, check_stop_event))
|
||||
check_thread.start()
|
||||
|
||||
# Configure failpoint to slow down walreceiver ingest
|
||||
@@ -125,7 +125,7 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
# because of the lag and waiting for lsn to replay to arrive.
|
||||
time.sleep(2)
|
||||
|
||||
with pg_cur(pg) as cur:
|
||||
with pg_cur(endpoint) as cur:
|
||||
# Create and initialize test table
|
||||
cur.execute("CREATE TABLE foo(x bigint)")
|
||||
|
||||
|
||||
@@ -15,4 +15,4 @@ def test_basebackup_error(neon_simple_env: NeonEnv):
|
||||
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
|
||||
|
||||
with pytest.raises(Exception, match="basebackup-before-control-file"):
|
||||
env.postgres.create_start("test_basebackup_error")
|
||||
env.endpoints.create_start("test_basebackup_error")
|
||||
|
||||
@@ -67,9 +67,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
|
||||
pg_main = env.postgres.create_start("test_main", tenant_id=tenant)
|
||||
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
|
||||
|
||||
main_cur = pg_main.connect().cursor()
|
||||
main_cur = endpoint_main.connect().cursor()
|
||||
|
||||
main_cur.execute(
|
||||
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
|
||||
@@ -90,9 +90,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
|
||||
env.neon_cli.create_branch(
|
||||
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
|
||||
)
|
||||
pg_branch = env.postgres.create_start("test_branch", tenant_id=tenant)
|
||||
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)
|
||||
|
||||
branch_cur = pg_branch.connect().cursor()
|
||||
branch_cur = endpoint_branch.connect().cursor()
|
||||
branch_cur.execute("INSERT INTO foo SELECT FROM generate_series(1, 100000)")
|
||||
|
||||
assert query_scalar(branch_cur, "SELECT count(*) FROM foo") == 200000
|
||||
@@ -142,8 +142,8 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
b0 = env.neon_cli.create_branch("b0", tenant_id=tenant)
|
||||
pg0 = env.postgres.create_start("b0", tenant_id=tenant)
|
||||
res = pg0.safe_psql_many(
|
||||
endpoint0 = env.endpoints.create_start("b0", tenant_id=tenant)
|
||||
res = endpoint0.safe_psql_many(
|
||||
queries=[
|
||||
"CREATE TABLE t(key serial primary key)",
|
||||
"INSERT INTO t SELECT FROM generate_series(1, 100000)",
|
||||
|
||||
@@ -18,10 +18,10 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Branch at the point where only 100 rows were inserted
|
||||
env.neon_cli.create_branch("test_branch_behind")
|
||||
pgmain = env.postgres.create_start("test_branch_behind")
|
||||
endpoint_main = env.endpoints.create_start("test_branch_behind")
|
||||
log.info("postgres is running on 'test_branch_behind' branch")
|
||||
|
||||
main_cur = pgmain.connect().cursor()
|
||||
main_cur = endpoint_main.connect().cursor()
|
||||
|
||||
timeline = TimelineId(query_scalar(main_cur, "SHOW neon.timeline_id"))
|
||||
|
||||
@@ -74,15 +74,15 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
|
||||
"test_branch_behind_more", "test_branch_behind", ancestor_start_lsn=lsn_b
|
||||
)
|
||||
|
||||
pg_hundred = env.postgres.create_start("test_branch_behind_hundred")
|
||||
pg_more = env.postgres.create_start("test_branch_behind_more")
|
||||
endpoint_hundred = env.endpoints.create_start("test_branch_behind_hundred")
|
||||
endpoint_more = env.endpoints.create_start("test_branch_behind_more")
|
||||
|
||||
# On the 'hundred' branch, we should see only 100 rows
|
||||
hundred_cur = pg_hundred.connect().cursor()
|
||||
hundred_cur = endpoint_hundred.connect().cursor()
|
||||
assert query_scalar(hundred_cur, "SELECT count(*) FROM foo") == 100
|
||||
|
||||
# On the 'more' branch, we should see 100200 rows
|
||||
more_cur = pg_more.connect().cursor()
|
||||
more_cur = endpoint_more.connect().cursor()
|
||||
assert query_scalar(more_cur, "SELECT count(*) FROM foo") == 200100
|
||||
|
||||
# All the rows are visible on the main branch
|
||||
@@ -94,8 +94,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.create_branch(
|
||||
"test_branch_segment_boundary", "test_branch_behind", ancestor_start_lsn=Lsn("0/3000000")
|
||||
)
|
||||
pg = env.postgres.create_start("test_branch_segment_boundary")
|
||||
assert pg.safe_psql("SELECT 1")[0][0] == 1
|
||||
endpoint = env.endpoints.create_start("test_branch_segment_boundary")
|
||||
assert endpoint.safe_psql("SELECT 1")[0][0] == 1
|
||||
|
||||
# branch at pre-initdb lsn
|
||||
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
|
||||
|
||||
@@ -5,7 +5,7 @@ from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
@@ -40,20 +40,20 @@ def test_branching_with_pgbench(
|
||||
}
|
||||
)
|
||||
|
||||
def run_pgbench(pg: Postgres):
|
||||
connstr = pg.connstr()
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-T15", connstr])
|
||||
|
||||
env.neon_cli.create_branch("b0", tenant_id=tenant)
|
||||
pgs: List[Postgres] = []
|
||||
pgs.append(env.postgres.create_start("b0", tenant_id=tenant))
|
||||
endpoints: List[Endpoint] = []
|
||||
endpoints.append(env.endpoints.create_start("b0", tenant_id=tenant))
|
||||
|
||||
threads: List[threading.Thread] = []
|
||||
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0],), daemon=True))
|
||||
threads.append(
|
||||
threading.Thread(target=run_pgbench, args=(endpoints[0].connstr(),), daemon=True)
|
||||
)
|
||||
threads[-1].start()
|
||||
|
||||
thread_limit = 4
|
||||
@@ -79,16 +79,18 @@ def test_branching_with_pgbench(
|
||||
else:
|
||||
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
|
||||
|
||||
pgs.append(env.postgres.create_start("b{}".format(i + 1), tenant_id=tenant))
|
||||
endpoints.append(env.endpoints.create_start("b{}".format(i + 1), tenant_id=tenant))
|
||||
|
||||
threads.append(threading.Thread(target=run_pgbench, args=(pgs[-1],), daemon=True))
|
||||
threads.append(
|
||||
threading.Thread(target=run_pgbench, args=(endpoints[-1].connstr(),), daemon=True)
|
||||
)
|
||||
threads[-1].start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
for pg in pgs:
|
||||
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
|
||||
for ep in endpoints:
|
||||
res = ep.safe_psql("SELECT count(*) from pgbench_accounts")
|
||||
assert res[0] == (100000 * scale,)
|
||||
|
||||
|
||||
@@ -110,11 +112,11 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("b0")
|
||||
pg0 = env.postgres.create_start("b0")
|
||||
endpoint0 = env.endpoints.create_start("b0")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
|
||||
|
||||
with pg0.cursor() as cur:
|
||||
with endpoint0.cursor() as cur:
|
||||
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
|
||||
@@ -123,6 +125,6 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
|
||||
|
||||
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
|
||||
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
|
||||
pg1 = env.postgres.create_start("b1")
|
||||
endpoint1 = env.endpoints.create_start("b1")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
|
||||
|
||||
@@ -4,7 +4,7 @@ from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
@@ -24,17 +24,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
]
|
||||
)
|
||||
|
||||
tenant_timelines: List[Tuple[TenantId, TimelineId, Postgres]] = []
|
||||
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
|
||||
|
||||
for n in range(4):
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
with pg.cursor() as cur:
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
|
||||
pg.stop()
|
||||
tenant_timelines.append((tenant_id, timeline_id, pg))
|
||||
endpoint.stop()
|
||||
tenant_timelines.append((tenant_id, timeline_id, endpoint))
|
||||
|
||||
# Stop the pageserver
|
||||
env.pageserver.stop()
|
||||
|
||||
@@ -24,14 +24,14 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
"autovacuum_freeze_max_age=100000",
|
||||
]
|
||||
|
||||
pg = env.postgres.create_start("test_clog_truncate", config_lines=config)
|
||||
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
|
||||
log.info("postgres is running on test_clog_truncate branch")
|
||||
|
||||
# Install extension containing function needed for test
|
||||
pg.safe_psql("CREATE EXTENSION neon_test_utils")
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
# Consume many xids to advance clog
|
||||
with pg.cursor() as cur:
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("select test_consume_xids(1000*1000*10);")
|
||||
log.info("xids consumed")
|
||||
|
||||
@@ -44,7 +44,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
|
||||
# wait for autovacuum to truncate the pg_xact
|
||||
# XXX Is it worth to add a timeout here?
|
||||
pg_xact_0000_path = os.path.join(pg.pg_xact_dir_path(), "0000")
|
||||
pg_xact_0000_path = os.path.join(endpoint.pg_xact_dir_path(), "0000")
|
||||
log.info(f"pg_xact_0000_path = {pg_xact_0000_path}")
|
||||
|
||||
while os.path.isfile(pg_xact_0000_path):
|
||||
@@ -52,7 +52,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
time.sleep(5)
|
||||
|
||||
# checkpoint to advance latest lsn
|
||||
with pg.cursor() as cur:
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CHECKPOINT;")
|
||||
lsn_after_truncation = query_scalar(cur, "select pg_current_wal_insert_lsn()")
|
||||
|
||||
@@ -61,10 +61,10 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
env.neon_cli.create_branch(
|
||||
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
|
||||
)
|
||||
pg2 = env.postgres.create_start("test_clog_truncate_new")
|
||||
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
|
||||
log.info("postgres is running on test_clog_truncate_new branch")
|
||||
|
||||
# check that new node doesn't contain truncated segment
|
||||
pg_xact_0000_path_new = os.path.join(pg2.pg_xact_dir_path(), "0000")
|
||||
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")
|
||||
log.info(f"pg_xact_0000_path_new = {pg_xact_0000_path_new}")
|
||||
assert os.path.isfile(pg_xact_0000_path_new) is False
|
||||
|
||||
@@ -24,8 +24,8 @@ def test_lsof_pageserver_pid(neon_simple_env: NeonEnv):
|
||||
|
||||
def start_workload():
|
||||
env.neon_cli.create_branch("test_lsof_pageserver_pid")
|
||||
pg = env.postgres.create_start("test_lsof_pageserver_pid")
|
||||
with closing(pg.connect()) as conn:
|
||||
endpoint = env.endpoints.create_start("test_lsof_pageserver_pid")
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE foo as SELECT x FROM generate_series(1,100000) x")
|
||||
cur.execute("update foo set x=x+1")
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import copy
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
@@ -55,29 +56,31 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
pg = env.postgres.create_start("main")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
|
||||
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
|
||||
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
|
||||
pg_bin.run(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
|
||||
pg_bin.run(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
|
||||
pg_bin.run(
|
||||
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||
)
|
||||
|
||||
snapshot_config = toml.load(test_output_dir / "repo" / "config")
|
||||
tenant_id = snapshot_config["default_tenant_id"]
|
||||
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
|
||||
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
|
||||
env.postgres.stop_all()
|
||||
env.endpoints.stop_all()
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
env.pageserver.stop()
|
||||
@@ -98,6 +101,9 @@ def test_backward_compatibility(
|
||||
pg_version: str,
|
||||
request: FixtureRequest,
|
||||
):
|
||||
"""
|
||||
Test that the new binaries can read old data
|
||||
"""
|
||||
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
|
||||
assert (
|
||||
compatibility_snapshot_dir_env is not None
|
||||
@@ -120,6 +126,7 @@ def test_backward_compatibility(
|
||||
check_neon_works(
|
||||
test_output_dir / "compatibility_snapshot" / "repo",
|
||||
neon_binpath,
|
||||
neon_binpath,
|
||||
pg_distrib_dir,
|
||||
pg_version,
|
||||
port_distributor,
|
||||
@@ -148,7 +155,11 @@ def test_forward_compatibility(
|
||||
port_distributor: PortDistributor,
|
||||
pg_version: str,
|
||||
request: FixtureRequest,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
"""
|
||||
Test that the old binaries can read new data
|
||||
"""
|
||||
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
|
||||
assert compatibility_neon_bin_env is not None, (
|
||||
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
|
||||
@@ -183,6 +194,7 @@ def test_forward_compatibility(
|
||||
check_neon_works(
|
||||
test_output_dir / "compatibility_snapshot" / "repo",
|
||||
compatibility_neon_bin,
|
||||
neon_binpath,
|
||||
compatibility_postgres_distrib_dir,
|
||||
pg_version,
|
||||
port_distributor,
|
||||
@@ -223,9 +235,13 @@ def prepare_snapshot(
|
||||
for logfile in repo_dir.glob("**/*.log"):
|
||||
logfile.unlink()
|
||||
|
||||
# Remove tenants data for compute
|
||||
for tenant in (repo_dir / "pgdatadirs" / "tenants").glob("*"):
|
||||
shutil.rmtree(tenant)
|
||||
# Remove old computes in 'endpoints'. Old versions of the control plane used a directory
|
||||
# called "pgdatadirs". Delete it, too.
|
||||
if (repo_dir / "endpoints").exists():
|
||||
shutil.rmtree(repo_dir / "endpoints")
|
||||
if (repo_dir / "pgdatadirs").exists():
|
||||
shutil.rmtree(repo_dir / "pgdatadirs")
|
||||
os.mkdir(repo_dir / "endpoints")
|
||||
|
||||
# Remove wal-redo temp directory if it exists. Newer pageserver versions don't create
|
||||
# them anymore, but old versions did.
|
||||
@@ -326,7 +342,8 @@ def get_neon_version(neon_binpath: Path):
|
||||
|
||||
def check_neon_works(
|
||||
repo_dir: Path,
|
||||
neon_binpath: Path,
|
||||
neon_target_binpath: Path,
|
||||
neon_current_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: str,
|
||||
port_distributor: PortDistributor,
|
||||
@@ -336,7 +353,7 @@ def check_neon_works(
|
||||
):
|
||||
snapshot_config_toml = repo_dir / "config"
|
||||
snapshot_config = toml.load(snapshot_config_toml)
|
||||
snapshot_config["neon_distrib_dir"] = str(neon_binpath)
|
||||
snapshot_config["neon_distrib_dir"] = str(neon_target_binpath)
|
||||
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
|
||||
with (snapshot_config_toml).open("w") as f:
|
||||
toml.dump(snapshot_config, f)
|
||||
@@ -347,17 +364,25 @@ def check_neon_works(
|
||||
config.repo_dir = repo_dir
|
||||
config.pg_version = pg_version
|
||||
config.initial_tenant = snapshot_config["default_tenant_id"]
|
||||
config.neon_binpath = neon_binpath
|
||||
config.pg_distrib_dir = pg_distrib_dir
|
||||
config.preserve_database_files = True
|
||||
|
||||
cli = NeonCli(config)
|
||||
cli.raw_cli(["start"])
|
||||
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
|
||||
# Use the "target" binaries to launch the storage nodes
|
||||
config_target = config
|
||||
config_target.neon_binpath = neon_target_binpath
|
||||
cli_target = NeonCli(config_target)
|
||||
|
||||
# And the current binaries to launch computes
|
||||
config_current = copy.copy(config)
|
||||
config_current.neon_binpath = neon_current_binpath
|
||||
cli_current = NeonCli(config_current)
|
||||
|
||||
cli_target.raw_cli(["start"])
|
||||
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
cli.pg_start("main", port=pg_port)
|
||||
request.addfinalizer(lambda: cli.pg_stop("main"))
|
||||
cli_current.endpoint_start("main", port=pg_port)
|
||||
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
|
||||
|
||||
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
|
||||
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])
|
||||
|
||||
@@ -13,10 +13,10 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
ctl = ComputeCtl(env)
|
||||
|
||||
env.neon_cli.create_branch("test_compute_ctl", "main")
|
||||
pg = env.postgres.create_start("test_compute_ctl")
|
||||
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
endpoint = env.endpoints.create_start("test_compute_ctl")
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
|
||||
with open(pg.config_file_path(), "r") as f:
|
||||
with open(endpoint.config_file_path(), "r") as f:
|
||||
cfg_lines = f.readlines()
|
||||
cfg_map = {}
|
||||
for line in cfg_lines:
|
||||
@@ -24,10 +24,13 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
k, v = line.split("=")
|
||||
cfg_map[k] = v.strip("\n '\"")
|
||||
log.info(f"postgres config: {cfg_map}")
|
||||
pgdata = pg.pg_data_dir_path()
|
||||
pgdata = endpoint.pg_data_dir_path()
|
||||
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
|
||||
|
||||
pg.stop_and_destroy()
|
||||
endpoint.stop_and_destroy()
|
||||
|
||||
# stop_and_destroy removes the whole endpoint directory. Recreate it.
|
||||
Path(pgdata).mkdir(parents=True)
|
||||
|
||||
spec = (
|
||||
"""
|
||||
|
||||
@@ -12,10 +12,10 @@ def test_config(neon_simple_env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_config", "empty")
|
||||
|
||||
# change config
|
||||
pg = env.postgres.create_start("test_config", config_lines=["log_min_messages=debug1"])
|
||||
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
|
||||
log.info("postgres is running on test_config branch")
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
|
||||
@@ -21,11 +21,11 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_crafted_wal_end")
|
||||
|
||||
pg = env.postgres.create("test_crafted_wal_end")
|
||||
endpoint = env.endpoints.create("test_crafted_wal_end")
|
||||
wal_craft = WalCraft(env)
|
||||
pg.config(wal_craft.postgres_config())
|
||||
pg.start()
|
||||
res = pg.safe_psql_many(
|
||||
endpoint.config(wal_craft.postgres_config())
|
||||
endpoint.start()
|
||||
res = endpoint.safe_psql_many(
|
||||
queries=[
|
||||
"CREATE TABLE keys(key int primary key)",
|
||||
"INSERT INTO keys SELECT generate_series(1, 100)",
|
||||
@@ -34,7 +34,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
)
|
||||
assert res[-1][0] == (5050,)
|
||||
|
||||
wal_craft.in_existing(wal_type, pg.connstr())
|
||||
wal_craft.in_existing(wal_type, endpoint.connstr())
|
||||
|
||||
log.info("Restarting all safekeepers and pageservers")
|
||||
env.pageserver.stop()
|
||||
@@ -43,7 +43,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
env.pageserver.start()
|
||||
|
||||
log.info("Trying more queries")
|
||||
res = pg.safe_psql_many(
|
||||
res = endpoint.safe_psql_many(
|
||||
queries=[
|
||||
"SELECT SUM(key) FROM keys",
|
||||
"INSERT INTO keys SELECT generate_series(101, 200)",
|
||||
@@ -60,7 +60,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
env.pageserver.start()
|
||||
|
||||
log.info("Trying more queries (again)")
|
||||
res = pg.safe_psql_many(
|
||||
res = endpoint.safe_psql_many(
|
||||
queries=[
|
||||
"SELECT SUM(key) FROM keys",
|
||||
"INSERT INTO keys SELECT generate_series(201, 300)",
|
||||
|
||||
@@ -13,10 +13,10 @@ def test_createdb(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_createdb", "empty")
|
||||
|
||||
pg = env.postgres.create_start("test_createdb")
|
||||
endpoint = env.endpoints.create_start("test_createdb")
|
||||
log.info("postgres is running on 'test_createdb' branch")
|
||||
|
||||
with pg.cursor() as cur:
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
cur.execute("VACUUM FULL pg_class")
|
||||
|
||||
@@ -26,10 +26,10 @@ def test_createdb(neon_simple_env: NeonEnv):
|
||||
|
||||
# Create a branch
|
||||
env.neon_cli.create_branch("test_createdb2", "test_createdb", ancestor_start_lsn=lsn)
|
||||
pg2 = env.postgres.create_start("test_createdb2")
|
||||
endpoint2 = env.endpoints.create_start("test_createdb2")
|
||||
|
||||
# Test that you can connect to the new database on both branches
|
||||
for db in (pg, pg2):
|
||||
for db in (endpoint, endpoint2):
|
||||
with db.cursor(dbname="foodb") as cur:
|
||||
# Check database size in both branches
|
||||
cur.execute(
|
||||
@@ -55,17 +55,17 @@ def test_createdb(neon_simple_env: NeonEnv):
|
||||
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_dropdb", "empty")
|
||||
pg = env.postgres.create_start("test_dropdb")
|
||||
endpoint = env.endpoints.create_start("test_dropdb")
|
||||
log.info("postgres is running on 'test_dropdb' branch")
|
||||
|
||||
with pg.cursor() as cur:
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
|
||||
lsn_before_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
dboid = query_scalar(cur, "SELECT oid FROM pg_database WHERE datname='foodb';")
|
||||
|
||||
with pg.cursor() as cur:
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("DROP DATABASE foodb")
|
||||
|
||||
cur.execute("CHECKPOINT")
|
||||
@@ -76,29 +76,29 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env.neon_cli.create_branch(
|
||||
"test_before_dropdb", "test_dropdb", ancestor_start_lsn=lsn_before_drop
|
||||
)
|
||||
pg_before = env.postgres.create_start("test_before_dropdb")
|
||||
endpoint_before = env.endpoints.create_start("test_before_dropdb")
|
||||
|
||||
env.neon_cli.create_branch(
|
||||
"test_after_dropdb", "test_dropdb", ancestor_start_lsn=lsn_after_drop
|
||||
)
|
||||
pg_after = env.postgres.create_start("test_after_dropdb")
|
||||
endpoint_after = env.endpoints.create_start("test_after_dropdb")
|
||||
|
||||
# Test that database exists on the branch before drop
|
||||
pg_before.connect(dbname="foodb").close()
|
||||
endpoint_before.connect(dbname="foodb").close()
|
||||
|
||||
# Test that database subdir exists on the branch before drop
|
||||
assert pg_before.pgdata_dir
|
||||
dbpath = pathlib.Path(pg_before.pgdata_dir) / "base" / str(dboid)
|
||||
assert endpoint_before.pgdata_dir
|
||||
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
|
||||
log.info(dbpath)
|
||||
|
||||
assert os.path.isdir(dbpath) is True
|
||||
|
||||
# Test that database subdir doesn't exist on the branch after drop
|
||||
assert pg_after.pgdata_dir
|
||||
dbpath = pathlib.Path(pg_after.pgdata_dir) / "base" / str(dboid)
|
||||
assert endpoint_after.pgdata_dir
|
||||
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
|
||||
log.info(dbpath)
|
||||
|
||||
assert os.path.isdir(dbpath) is False
|
||||
|
||||
# Check that we restore the content of the datadir correctly
|
||||
check_restored_datadir_content(test_output_dir, env, pg)
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user