mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 11:30:37 +00:00
Compare commits
46 Commits
conrad/com
...
elizabeth/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ff2eb6a9e | ||
|
|
3d822dbbde | ||
|
|
af46b5286f | ||
|
|
47f7efee06 | ||
|
|
bdfc6d3ef9 | ||
|
|
f47e90fd42 | ||
|
|
868c38f522 | ||
|
|
9cc79672f3 | ||
|
|
4a9b1ad5cb | ||
|
|
c8b2ac93cf | ||
|
|
dc4238896a | ||
|
|
e1fa844da4 | ||
|
|
c8a2612207 | ||
|
|
b6e89a3af8 | ||
|
|
b2954d16ff | ||
|
|
79485e7c3a | ||
|
|
261a9ae093 | ||
|
|
cac4ee8ea3 | ||
|
|
eaf1ab21c4 | ||
|
|
6508f4e5c1 | ||
|
|
a298d2c29b | ||
|
|
8b197de7ff | ||
|
|
15d079cd41 | ||
|
|
7636c4085a | ||
|
|
dc1625cd8e | ||
|
|
a6d4de25cd | ||
|
|
ec1452a559 | ||
|
|
1950ccfe33 | ||
|
|
2ca6665f4a | ||
|
|
fa954671b2 | ||
|
|
6f4ffdb48b | ||
|
|
3f676df3d5 | ||
|
|
20f4febce1 | ||
|
|
762905cf8d | ||
|
|
830ef35ed3 | ||
|
|
d8d62fb7cb | ||
|
|
e6a404c66d | ||
|
|
7e711ede44 | ||
|
|
e95f2f9a67 | ||
|
|
5a045e7d52 | ||
|
|
67fbc0582e | ||
|
|
3af6b3a2bf | ||
|
|
04013929cb | ||
|
|
83069f6ca1 | ||
|
|
7d4f662fbf | ||
|
|
a5cac52e26 |
@@ -313,10 +313,10 @@ jobs:
|
||||
# Use tar to copy files matching the pattern, preserving the paths in the destionation
|
||||
tar c \
|
||||
pg_install/v* \
|
||||
pg_install/build/*/src/test/regress/*.so \
|
||||
pg_install/build/*/src/test/regress/pg_regress \
|
||||
pg_install/build/*/src/test/isolation/isolationtester \
|
||||
pg_install/build/*/src/test/isolation/pg_isolation_regress \
|
||||
build/*/src/test/regress/*.so \
|
||||
build/*/src/test/regress/pg_regress \
|
||||
build/*/src/test/isolation/isolationtester \
|
||||
build/*/src/test/isolation/pg_isolation_regress \
|
||||
| tar x -C /tmp/neon
|
||||
|
||||
- name: Upload Neon artifact
|
||||
|
||||
16
.github/workflows/build-macos.yml
vendored
16
.github/workflows/build-macos.yml
vendored
@@ -110,7 +110,7 @@ jobs:
|
||||
|
||||
build-walproposer-lib:
|
||||
if: |
|
||||
inputs.pg_versions != '[]' || inputs.rebuild_everything ||
|
||||
contains(inputs.pg_versions, 'v17') || inputs.rebuild_everything ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
@@ -144,7 +144,7 @@ jobs:
|
||||
id: cache_walproposer_lib
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
path: pg_install/build/walproposer-lib
|
||||
path: build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Checkout submodule vendor/postgres-v17
|
||||
@@ -169,11 +169,11 @@ jobs:
|
||||
run:
|
||||
make walproposer-lib -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Upload "pg_install/build/walproposer-lib" artifact
|
||||
- name: Upload "build/walproposer-lib" artifact
|
||||
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
|
||||
with:
|
||||
name: pg_install--build--walproposer-lib
|
||||
path: pg_install/build/walproposer-lib
|
||||
name: build--walproposer-lib
|
||||
path: build/walproposer-lib
|
||||
# The artifact is supposed to be used by the next job in the same workflow,
|
||||
# so there’s no need to store it for too long.
|
||||
retention-days: 1
|
||||
@@ -226,11 +226,11 @@ jobs:
|
||||
name: pg_install--v17
|
||||
path: pg_install/v17
|
||||
|
||||
- name: Download "pg_install/build/walproposer-lib" artifact
|
||||
- name: Download "build/walproposer-lib" artifact
|
||||
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
|
||||
with:
|
||||
name: pg_install--build--walproposer-lib
|
||||
path: pg_install/build/walproposer-lib
|
||||
name: build--walproposer-lib
|
||||
path: build/walproposer-lib
|
||||
|
||||
# `actions/download-artifact` doesn't preserve permissions:
|
||||
# https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss
|
||||
|
||||
11
.github/workflows/large_oltp_benchmark.yml
vendored
11
.github/workflows/large_oltp_benchmark.yml
vendored
@@ -33,11 +33,19 @@ jobs:
|
||||
fail-fast: false # allow other variants to continue even if one fails
|
||||
matrix:
|
||||
include:
|
||||
# test only read-only custom scripts in new branch without database maintenance
|
||||
- target: new_branch
|
||||
custom_scripts: select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3
|
||||
test_maintenance: false
|
||||
# test all custom scripts in new branch with database maintenance
|
||||
- target: new_branch
|
||||
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
|
||||
test_maintenance: true
|
||||
# test all custom scripts in reuse branch with database maintenance
|
||||
- target: reuse_branch
|
||||
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
|
||||
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
|
||||
test_maintenance: true
|
||||
max-parallel: 1 # we want to run each benchmark sequentially to not have noisy neighbors on shared storage (PS, SK)
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
@@ -145,6 +153,7 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
|
||||
- name: Benchmark database maintenance
|
||||
if: ${{ matrix.test_maintenance == 'true' }}
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
|
||||
175
.github/workflows/large_oltp_growth.yml
vendored
Normal file
175
.github/workflows/large_oltp_growth.yml
vendored
Normal file
@@ -0,0 +1,175 @@
|
||||
name: large oltp growth
|
||||
# workflow to grow the reuse branch of large oltp benchmark continuously (about 16 GB per run)
|
||||
|
||||
on:
|
||||
# uncomment to run on push for debugging your PR
|
||||
# push:
|
||||
# branches: [ bodobolero/increase_large_oltp_workload ]
|
||||
|
||||
schedule:
|
||||
# * is a special character in YAML so you have to quote this string
|
||||
# ┌───────────── minute (0 - 59)
|
||||
# │ ┌───────────── hour (0 - 23)
|
||||
# │ │ ┌───────────── day of the month (1 - 31)
|
||||
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
|
||||
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
|
||||
- cron: '0 6 * * *' # 06:00 UTC
|
||||
- cron: '0 8 * * *' # 08:00 UTC
|
||||
- cron: '0 10 * * *' # 10:00 UTC
|
||||
- cron: '0 12 * * *' # 12:00 UTC
|
||||
- cron: '0 14 * * *' # 14:00 UTC
|
||||
- cron: '0 16 * * *' # 16:00 UTC
|
||||
workflow_dispatch: # adds ability to run this manually
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
concurrency:
|
||||
# Allow only one workflow globally because we need dedicated resources which only exist once
|
||||
group: large-oltp-growth
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
oltp:
|
||||
strategy:
|
||||
fail-fast: false # allow other variants to continue even if one fails
|
||||
matrix:
|
||||
include:
|
||||
# for now only grow the reuse branch, not the other branches.
|
||||
- target: reuse_branch
|
||||
custom_scripts:
|
||||
- grow_action_blocks.sql
|
||||
- grow_action_kwargs.sql
|
||||
- grow_device_fingerprint_event.sql
|
||||
- grow_edges.sql
|
||||
- grow_hotel_rate_mapping.sql
|
||||
- grow_ocr_pipeline_results_version.sql
|
||||
- grow_priceline_raw_response.sql
|
||||
- grow_relabled_transactions.sql
|
||||
- grow_state_values.sql
|
||||
- grow_values.sql
|
||||
- grow_vertices.sql
|
||||
- update_accounting_coding_body_tracking_category_selection.sql
|
||||
- update_action_blocks.sql
|
||||
- update_action_kwargs.sql
|
||||
- update_denormalized_approval_workflow.sql
|
||||
- update_device_fingerprint_event.sql
|
||||
- update_edges.sql
|
||||
- update_heron_transaction_enriched_log.sql
|
||||
- update_heron_transaction_enrichment_requests.sql
|
||||
- update_hotel_rate_mapping.sql
|
||||
- update_incoming_webhooks.sql
|
||||
- update_manual_transaction.sql
|
||||
- update_ml_receipt_matching_log.sql
|
||||
- update_ocr_pipeine_results_version.sql
|
||||
- update_orc_pipeline_step_results.sql
|
||||
- update_orc_pipeline_step_results_version.sql
|
||||
- update_priceline_raw_response.sql
|
||||
- update_quickbooks_transactions.sql
|
||||
- update_raw_finicity_transaction.sql
|
||||
- update_relabeled_transactions.sql
|
||||
- update_state_values.sql
|
||||
- update_stripe_authorization_event_log.sql
|
||||
- update_transaction.sql
|
||||
- update_values.sql
|
||||
- update_vertices.sql
|
||||
max-parallel: 1 # we want to run each growth workload sequentially (for now there is just one)
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
id-token: write # aws-actions/configure-aws-credentials
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "1h"
|
||||
TEST_PGBENCH_CUSTOM_SCRIPTS: ${{ join(matrix.custom_scripts, ' ') }}
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
PG_VERSION: 16 # pre-determined by pre-determined project
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
PLATFORM: ${{ matrix.target }}
|
||||
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Configure AWS credentials # necessary to download artefacts
|
||||
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
|
||||
with:
|
||||
aws-region: eu-central-1
|
||||
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${{ matrix.target }}" in
|
||||
reuse_branch)
|
||||
CONNSTR=${{ secrets.BENCHMARK_LARGE_OLTP_REUSE_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown target=${{ matrix.target }}"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
CONNSTR_WITHOUT_POOLER="${CONNSTR//-pooler/}"
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
echo "connstr_without_pooler=${CONNSTR_WITHOUT_POOLER}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: pgbench with custom-scripts
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: true
|
||||
extra_params: -m remote_cluster --timeout 7200 -k test_perf_oltp_large_tenant_growth
|
||||
pg_version: ${{ env.PG_VERSION }}
|
||||
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
|
||||
- name: Create Allure report
|
||||
id: create-allure-report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
with:
|
||||
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
|
||||
with:
|
||||
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
|
||||
slack-message: |
|
||||
Periodic large oltp tenant growth increase: ${{ job.status }}
|
||||
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
|
||||
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,4 +1,5 @@
|
||||
/artifact_cache
|
||||
/build
|
||||
/pg_install
|
||||
/target
|
||||
/tmp_check
|
||||
|
||||
62
Cargo.lock
generated
62
Cargo.lock
generated
@@ -1235,6 +1235,25 @@ dependencies = [
|
||||
"replace_with",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "client_cache"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"hyper-util",
|
||||
"priority-queue",
|
||||
"rand 0.8.5",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tower 0.5.2",
|
||||
"uuid",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.0"
|
||||
@@ -4255,6 +4274,7 @@ dependencies = [
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -4334,6 +4354,7 @@ dependencies = [
|
||||
"postgres_backend",
|
||||
"postgres_connection",
|
||||
"postgres_ffi",
|
||||
"postgres_ffi_types",
|
||||
"postgres_initdb",
|
||||
"posthog_client_lite",
|
||||
"pprof",
|
||||
@@ -4403,7 +4424,7 @@ dependencies = [
|
||||
"nix 0.30.1",
|
||||
"once_cell",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"postgres_ffi_types",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
@@ -4465,11 +4486,16 @@ dependencies = [
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"futures",
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"prost 0.13.5",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tonic 0.13.1",
|
||||
"tonic-build",
|
||||
"utils",
|
||||
@@ -4889,6 +4915,7 @@ dependencies = [
|
||||
"memoffset 0.9.0",
|
||||
"once_cell",
|
||||
"postgres",
|
||||
"postgres_ffi_types",
|
||||
"pprof",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -4897,6 +4924,14 @@ dependencies = [
|
||||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres_ffi_types"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "postgres_initdb"
|
||||
version = "0.1.0"
|
||||
@@ -5013,6 +5048,17 @@ dependencies = [
|
||||
"elliptic-curve 0.13.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "priority-queue"
|
||||
version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5676d703dda103cbb035b653a9f11448c0a7216c7926bd35fcb5865475d0c970"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"equivalent",
|
||||
"indexmap 2.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.94"
|
||||
@@ -5631,9 +5677,16 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "replace_with"
|
||||
version = "0.1.7"
|
||||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690"
|
||||
checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884"
|
||||
|
||||
[[package]]
|
||||
name = "request_tracker"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
@@ -7558,6 +7611,7 @@ dependencies = [
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"flate2",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
@@ -7577,6 +7631,7 @@ dependencies = [
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8156,6 +8211,7 @@ dependencies = [
|
||||
"futures",
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"postgres_ffi_types",
|
||||
"pprof",
|
||||
"prost 0.13.5",
|
||||
"remote_storage",
|
||||
|
||||
@@ -8,8 +8,10 @@ members = [
|
||||
"pageserver/compaction",
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/communicator_pools/client_cache",
|
||||
"pageserver/pagebench",
|
||||
"pageserver/page_api",
|
||||
"pageserver/communicator_pools/request_tracker",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"safekeeper/client",
|
||||
@@ -22,6 +24,7 @@ members = [
|
||||
"libs/http-utils",
|
||||
"libs/pageserver_api",
|
||||
"libs/postgres_ffi",
|
||||
"libs/postgres_ffi_types",
|
||||
"libs/safekeeper_api",
|
||||
"libs/desim",
|
||||
"libs/neon-shmem",
|
||||
@@ -199,7 +202,7 @@ tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "gzip", "prost", "router", "server", "tls-ring", "tls-native-roots", "zstd"] }
|
||||
tonic-reflection = { version = "0.13.1", features = ["server"] }
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
@@ -256,9 +259,12 @@ pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
client_cache = { path = "./pageserver/communicator_pools/client_cache" }
|
||||
request_tracker = { path = "./pageserver/communicator_pools/request_tracker" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
postgres_ffi_types = { version = "0.1", path = "./libs/postgres_ffi_types/" }
|
||||
postgres_initdb = { path = "./libs/postgres_initdb" }
|
||||
posthog_client_lite = { version = "0.1", path = "./libs/posthog_client_lite" }
|
||||
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }
|
||||
|
||||
@@ -5,8 +5,6 @@
|
||||
ARG REPOSITORY=ghcr.io/neondatabase
|
||||
ARG IMAGE=build-tools
|
||||
ARG TAG=pinned
|
||||
ARG DEFAULT_PG_VERSION=17
|
||||
ARG STABLE_PG_VERSION=16
|
||||
ARG DEBIAN_VERSION=bookworm
|
||||
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
|
||||
@@ -47,7 +45,6 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
|
||||
ENV BUILD_TYPE=release
|
||||
RUN set -e \
|
||||
&& mold -run make -j $(nproc) -s neon-pg-ext \
|
||||
&& rm -rf pg_install/build \
|
||||
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
|
||||
|
||||
# Prepare cargo-chef recipe
|
||||
@@ -63,14 +60,11 @@ FROM $REPOSITORY/$IMAGE:$TAG AS build
|
||||
WORKDIR /home/nonroot
|
||||
ARG GIT_VERSION=local
|
||||
ARG BUILD_TAG
|
||||
ARG STABLE_PG_VERSION
|
||||
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v14/include/postgresql/server pg_install/v14/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_install/v15/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v17/include/postgresql/server pg_install/v17/include/postgresql/server
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/lib pg_install/v16/lib
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v17/lib pg_install/v17/lib
|
||||
COPY --from=plan /home/nonroot/recipe.json recipe.json
|
||||
|
||||
ARG ADDITIONAL_RUSTFLAGS=""
|
||||
@@ -97,7 +91,6 @@ RUN set -e \
|
||||
# Build final image
|
||||
#
|
||||
FROM $BASE_IMAGE_SHA
|
||||
ARG DEFAULT_PG_VERSION
|
||||
WORKDIR /data
|
||||
|
||||
RUN set -e \
|
||||
@@ -107,8 +100,6 @@ RUN set -e \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
ca-certificates \
|
||||
# System postgres for use with client libraries (e.g. in storage controller)
|
||||
postgresql-15 \
|
||||
openssl \
|
||||
unzip \
|
||||
curl \
|
||||
|
||||
145
Makefile
145
Makefile
@@ -1,8 +1,12 @@
|
||||
ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
||||
|
||||
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
|
||||
# Where to install Postgres, default is ./pg_install, maybe useful for package
|
||||
# managers.
|
||||
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
|
||||
|
||||
# All intermediate build artifacts are stored here.
|
||||
BUILD_DIR := build
|
||||
|
||||
ICU_PREFIX_DIR := /usr/local/icu
|
||||
|
||||
#
|
||||
@@ -104,21 +108,20 @@ cargo-target-dir:
|
||||
# Some rules are duplicated for Postgres v14 and 15. We may want to refactor
|
||||
# to avoid the duplication in the future, but it's tolerable for now.
|
||||
#
|
||||
$(POSTGRES_INSTALL_DIR)/build/%/config.status:
|
||||
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)
|
||||
test -e $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG
|
||||
$(BUILD_DIR)/%/config.status:
|
||||
mkdir -p $(BUILD_DIR)
|
||||
test -e $(BUILD_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(BUILD_DIR)/CACHEDIR.TAG
|
||||
|
||||
+@echo "Configuring Postgres $* build"
|
||||
@test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \
|
||||
echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \
|
||||
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
|
||||
exit 1; }
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
|
||||
mkdir -p $(BUILD_DIR)/$*
|
||||
|
||||
VERSION=$*; \
|
||||
EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \
|
||||
(cd $(POSTGRES_INSTALL_DIR)/build/$$VERSION && \
|
||||
(cd $(BUILD_DIR)/$$VERSION && \
|
||||
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \
|
||||
CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \
|
||||
$(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \
|
||||
@@ -130,96 +133,54 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status:
|
||||
# the "build-all-versions" entry points) where direct mention of PostgreSQL
|
||||
# versions is used.
|
||||
.PHONY: postgres-configure-v17
|
||||
postgres-configure-v17: $(POSTGRES_INSTALL_DIR)/build/v17/config.status
|
||||
postgres-configure-v17: $(BUILD_DIR)/v17/config.status
|
||||
.PHONY: postgres-configure-v16
|
||||
postgres-configure-v16: $(POSTGRES_INSTALL_DIR)/build/v16/config.status
|
||||
postgres-configure-v16: $(BUILD_DIR)/v16/config.status
|
||||
.PHONY: postgres-configure-v15
|
||||
postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status
|
||||
postgres-configure-v15: $(BUILD_DIR)/v15/config.status
|
||||
.PHONY: postgres-configure-v14
|
||||
postgres-configure-v14: $(POSTGRES_INSTALL_DIR)/build/v14/config.status
|
||||
postgres-configure-v14: $(BUILD_DIR)/v14/config.status
|
||||
|
||||
# Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)/<version>/include
|
||||
.PHONY: postgres-headers-%
|
||||
postgres-headers-%: postgres-configure-%
|
||||
+@echo "Installing PostgreSQL $* headers"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/include MAKELEVEL=0 install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/src/include MAKELEVEL=0 install
|
||||
|
||||
# Compile and install PostgreSQL
|
||||
.PHONY: postgres-%
|
||||
postgres-%: postgres-configure-% \
|
||||
postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers`
|
||||
+@echo "Compiling PostgreSQL $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 install
|
||||
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install
|
||||
+@echo "Compiling libpq $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/src/interfaces/libpq install
|
||||
+@echo "Compiling pg_prewarm $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_prewarm install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install
|
||||
+@echo "Compiling pg_buffercache $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_buffercache install
|
||||
+@echo "Compiling pg_visibility $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_visibility install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_visibility install
|
||||
+@echo "Compiling pageinspect $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pageinspect install
|
||||
+@echo "Compiling pg_trgm $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_trgm install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_trgm install
|
||||
+@echo "Compiling amcheck $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/amcheck install
|
||||
+@echo "Compiling test_decoding $*"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install
|
||||
|
||||
.PHONY: postgres-clean-%
|
||||
postgres-clean-%:
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect clean
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq clean
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/contrib/test_decoding install
|
||||
|
||||
.PHONY: postgres-check-%
|
||||
postgres-check-%: postgres-%
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check
|
||||
$(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 check
|
||||
|
||||
.PHONY: neon-pg-ext-%
|
||||
neon-pg-ext-%: postgres-%
|
||||
+@echo "Compiling neon $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$*
|
||||
+@echo "Compiling neon-specific Postgres extensions for $*"
|
||||
mkdir -p $(BUILD_DIR)/pgxn-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install
|
||||
+@echo "Compiling neon_walredo $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install
|
||||
+@echo "Compiling neon_rmgr $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install
|
||||
+@echo "Compiling neon_test_utils $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install
|
||||
+@echo "Compiling neon_utils $*"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$*
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
|
||||
|
||||
.PHONY: neon-pg-clean-ext-%
|
||||
neon-pg-clean-ext-%:
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile clean
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile clean
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean
|
||||
-C $(BUILD_DIR)/pgxn-$*\
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/Makefile install
|
||||
|
||||
# Build walproposer as a static library. walproposer source code is located
|
||||
# in the pgxn/neon directory.
|
||||
@@ -233,15 +194,15 @@ neon-pg-clean-ext-%:
|
||||
.PHONY: walproposer-lib
|
||||
walproposer-lib: neon-pg-ext-v17
|
||||
+@echo "Compiling walproposer-lib"
|
||||
mkdir -p $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
|
||||
mkdir -p $(BUILD_DIR)/walproposer-lib
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
|
||||
-C $(BUILD_DIR)/walproposer-lib \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib
|
||||
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
|
||||
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
|
||||
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \
|
||||
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(BUILD_DIR)/walproposer-lib
|
||||
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(BUILD_DIR)/walproposer-lib
|
||||
$(AR) d $(BUILD_DIR)/walproposer-lib/libpgport.a \
|
||||
pg_strong_random.o
|
||||
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \
|
||||
$(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \
|
||||
checksum_helper.o \
|
||||
cryptohash_openssl.o \
|
||||
hmac_openssl.o \
|
||||
@@ -249,16 +210,10 @@ walproposer-lib: neon-pg-ext-v17
|
||||
parse_manifest.o \
|
||||
scram-common.o
|
||||
ifeq ($(UNAME_S),Linux)
|
||||
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \
|
||||
$(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \
|
||||
pg_crc32c.o
|
||||
endif
|
||||
|
||||
.PHONY: walproposer-lib-clean
|
||||
walproposer-lib-clean:
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean
|
||||
|
||||
.PHONY: neon-pg-ext
|
||||
neon-pg-ext: \
|
||||
neon-pg-ext-v14 \
|
||||
@@ -266,13 +221,6 @@ neon-pg-ext: \
|
||||
neon-pg-ext-v16 \
|
||||
neon-pg-ext-v17
|
||||
|
||||
.PHONY: neon-pg-clean-ext
|
||||
neon-pg-clean-ext: \
|
||||
neon-pg-clean-ext-v14 \
|
||||
neon-pg-clean-ext-v15 \
|
||||
neon-pg-clean-ext-v16 \
|
||||
neon-pg-clean-ext-v17
|
||||
|
||||
# shorthand to build all Postgres versions
|
||||
.PHONY: postgres
|
||||
postgres: \
|
||||
@@ -288,13 +236,6 @@ postgres-headers: \
|
||||
postgres-headers-v16 \
|
||||
postgres-headers-v17
|
||||
|
||||
.PHONY: postgres-clean
|
||||
postgres-clean: \
|
||||
postgres-clean-v14 \
|
||||
postgres-clean-v15 \
|
||||
postgres-clean-v16 \
|
||||
postgres-clean-v17
|
||||
|
||||
.PHONY: postgres-check
|
||||
postgres-check: \
|
||||
postgres-check-v14 \
|
||||
@@ -302,12 +243,6 @@ postgres-check: \
|
||||
postgres-check-v16 \
|
||||
postgres-check-v17
|
||||
|
||||
# This doesn't remove the effects of 'configure'.
|
||||
.PHONY: clean
|
||||
clean: postgres-clean neon-pg-clean-ext
|
||||
$(MAKE) -C compute clean
|
||||
$(CARGO_CMD_PREFIX) cargo clean
|
||||
|
||||
# This removes everything
|
||||
.PHONY: distclean
|
||||
distclean:
|
||||
@@ -320,7 +255,7 @@ fmt:
|
||||
|
||||
postgres-%-pg-bsd-indent: postgres-%
|
||||
+@echo "Compiling pg_bsd_indent"
|
||||
$(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/
|
||||
$(MAKE) -C $(BUILD_DIR)/$*/src/tools/pg_bsd_indent/
|
||||
|
||||
# Create typedef list for the core. Note that generally it should be combined with
|
||||
# buildfarm one to cover platform specific stuff.
|
||||
@@ -339,7 +274,7 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list
|
||||
cat $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/typedefs.list |\
|
||||
cat - postgres-$*-typedefs.list | sort | uniq > postgres-$*-typedefs-full.list
|
||||
+@echo note: you might want to run it on selected files/dirs instead.
|
||||
INDENT=$(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/pg_bsd_indent \
|
||||
INDENT=$(BUILD_DIR)/$*/src/tools/pg_bsd_indent/pg_bsd_indent \
|
||||
$(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/pgindent --typedefs postgres-$*-typedefs-full.list \
|
||||
$(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/ \
|
||||
--excludes $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/exclude_file_patterns
|
||||
@@ -350,9 +285,9 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list
|
||||
neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
|
||||
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \
|
||||
FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/find_typedef \
|
||||
INDENT=$(POSTGRES_INSTALL_DIR)/build/v17/src/tools/pg_bsd_indent/pg_bsd_indent \
|
||||
INDENT=$(BUILD_DIR)/v17/src/tools/pg_bsd_indent/pg_bsd_indent \
|
||||
PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/pgindent/pgindent \
|
||||
-C $(POSTGRES_INSTALL_DIR)/build/neon-v17 \
|
||||
-C $(BUILD_DIR)/neon-v17 \
|
||||
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent
|
||||
|
||||
|
||||
|
||||
@@ -149,8 +149,10 @@ RUN case $DEBIAN_VERSION in \
|
||||
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
|
||||
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
|
||||
libclang-dev \
|
||||
$VERSION_INSTALLS \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/* && \
|
||||
useradd -ms /bin/bash nonroot -b /home
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1057,17 +1059,10 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg build with nonroot user and cargo installed"
|
||||
# This layer is base and common for layers with `pgrx`
|
||||
# Layer "build-deps with Rust toolchain installed"
|
||||
#
|
||||
#########################################################################################
|
||||
FROM pg-build AS pg-build-nonroot-with-cargo
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends --no-install-suggests -y curl libclang-dev && \
|
||||
apt clean && rm -rf /var/lib/apt/lists/* && \
|
||||
useradd -ms /bin/bash nonroot -b /home
|
||||
FROM build-deps AS build-deps-with-cargo
|
||||
|
||||
ENV HOME=/home/nonroot
|
||||
ENV PATH="/home/nonroot/.cargo/bin:$PATH"
|
||||
@@ -1082,13 +1077,29 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
|
||||
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
|
||||
rm rustup-init
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-build with Rust toolchain installed"
|
||||
# This layer is base and common for layers with `pgrx`
|
||||
#
|
||||
#########################################################################################
|
||||
FROM pg-build AS pg-build-with-cargo
|
||||
ARG PG_VERSION
|
||||
|
||||
ENV HOME=/home/nonroot
|
||||
ENV PATH="/home/nonroot/.cargo/bin:$PATH"
|
||||
USER nonroot
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
COPY --from=build-deps-with-cargo /home/nonroot /home/nonroot
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
# This layer is used to build `pgrx` deps
|
||||
#
|
||||
#########################################################################################
|
||||
FROM pg-build-nonroot-with-cargo AS rust-extensions-build
|
||||
FROM pg-build-with-cargo AS rust-extensions-build
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN case "${PG_VERSION:?}" in \
|
||||
@@ -1110,7 +1121,7 @@ USER root
|
||||
# and eventually get merged with `rust-extensions-build`
|
||||
#
|
||||
#########################################################################################
|
||||
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx12
|
||||
FROM pg-build-with-cargo AS rust-extensions-build-pgrx12
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN cargo install --locked --version 0.12.9 cargo-pgrx && \
|
||||
@@ -1127,7 +1138,7 @@ USER root
|
||||
# and eventually get merged with `rust-extensions-build`
|
||||
#
|
||||
#########################################################################################
|
||||
FROM pg-build-nonroot-with-cargo AS rust-extensions-build-pgrx14
|
||||
FROM pg-build-with-cargo AS rust-extensions-build-pgrx14
|
||||
ARG PG_VERSION
|
||||
|
||||
RUN cargo install --locked --version 0.14.1 cargo-pgrx && \
|
||||
@@ -1144,10 +1155,12 @@ USER root
|
||||
|
||||
FROM build-deps AS pgrag-src
|
||||
ARG PG_VERSION
|
||||
|
||||
WORKDIR /ext-src
|
||||
COPY compute/patches/onnxruntime.patch .
|
||||
|
||||
RUN wget https://github.com/microsoft/onnxruntime/archive/refs/tags/v1.18.1.tar.gz -O onnxruntime.tar.gz && \
|
||||
mkdir onnxruntime-src && cd onnxruntime-src && tar xzf ../onnxruntime.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /ext-src/onnxruntime.patch && \
|
||||
echo "#nothing to test here" > neon-test.sh
|
||||
|
||||
RUN wget https://github.com/neondatabase-labs/pgrag/archive/refs/tags/v0.1.2.tar.gz -O pgrag.tar.gz && \
|
||||
@@ -1621,18 +1634,7 @@ FROM pg-build AS neon-ext-build
|
||||
ARG PG_VERSION
|
||||
|
||||
COPY pgxn/ pgxn/
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
-C pgxn/neon \
|
||||
-s install && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
-C pgxn/neon_utils \
|
||||
-s install && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
-C pgxn/neon_test_utils \
|
||||
-s install && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
-C pgxn/neon_rmgr \
|
||||
-s install
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) -C pgxn -s install-compute
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1722,29 +1724,11 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
|
||||
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
|
||||
#
|
||||
#########################################################################################
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools-plan
|
||||
ARG BUILD_TAG
|
||||
ENV BUILD_TAG=$BUILD_TAG
|
||||
|
||||
WORKDIR /home/nonroot
|
||||
USER nonroot
|
||||
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
RUN cargo chef prepare --recipe-path recipe.json
|
||||
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
|
||||
ARG BUILD_TAG
|
||||
ENV BUILD_TAG=$BUILD_TAG
|
||||
|
||||
USER nonroot
|
||||
|
||||
COPY --from=compute-tools-plan /home/nonroot/recipe.json recipe.json
|
||||
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
|
||||
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
|
||||
--mount=type=cache,uid=1000,target=/home/nonroot/target \
|
||||
mold -run cargo chef cook --locked --profile release-line-debug-size-lto --recipe-path recipe.json
|
||||
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
|
||||
|
||||
@@ -21,6 +21,8 @@ unix_socket_dir=/tmp/
|
||||
unix_socket_mode=0777
|
||||
; required for pgbouncer_exporter
|
||||
ignore_startup_parameters=extra_float_digits
|
||||
; pidfile for graceful termination
|
||||
pidfile=/tmp/pgbouncer.pid
|
||||
|
||||
;; Disable connection logging. It produces a lot of logs that no one looks at,
|
||||
;; and we can get similar log entries from the proxy too. We had incidents in
|
||||
|
||||
15
compute/patches/onnxruntime.patch
Normal file
15
compute/patches/onnxruntime.patch
Normal file
@@ -0,0 +1,15 @@
|
||||
diff --git a/cmake/deps.txt b/cmake/deps.txt
|
||||
index d213b09034..229de2ebf0 100644
|
||||
--- a/cmake/deps.txt
|
||||
+++ b/cmake/deps.txt
|
||||
@@ -22,7 +22,9 @@ dlpack;https://github.com/dmlc/dlpack/archive/refs/tags/v0.6.zip;4d565dd2e5b3132
|
||||
# it contains changes on top of 3.4.0 which are required to fix build issues.
|
||||
# Until the 3.4.1 release this is the best option we have.
|
||||
# Issue link: https://gitlab.com/libeigen/eigen/-/issues/2744
|
||||
-eigen;https://gitlab.com/libeigen/eigen/-/archive/e7248b26a1ed53fa030c5c459f7ea095dfd276ac/eigen-e7248b26a1ed53fa030c5c459f7ea095dfd276ac.zip;be8be39fdbc6e60e94fa7870b280707069b5b81a
|
||||
+# Moved to github mirror to avoid gitlab issues.Add commentMore actions
|
||||
+# Issue link: https://github.com/bazelbuild/bazel-central-registry/issues/4355
|
||||
+eigen;https://github.com/eigen-mirror/eigen/archive/e7248b26a1ed53fa030c5c459f7ea095dfd276ac/eigen-e7248b26a1ed53fa030c5c459f7ea095dfd276ac.zip;61418a349000ba7744a3ad03cf5071f22ebf860a
|
||||
flatbuffers;https://github.com/google/flatbuffers/archive/refs/tags/v23.5.26.zip;59422c3b5e573dd192fead2834d25951f1c1670c
|
||||
fp16;https://github.com/Maratyszcza/FP16/archive/0a92994d729ff76a58f692d3028ca1b64b145d91.zip;b985f6985a05a1c03ff1bb71190f66d8f98a1494
|
||||
fxdiv;https://github.com/Maratyszcza/FXdiv/archive/63058eff77e11aa15bf531df5dd34395ec3017c8.zip;a5658f4036402dbca7cebee32be57fb8149811e1
|
||||
@@ -124,6 +124,10 @@ struct Cli {
|
||||
/// Interval in seconds for collecting installed extensions statistics
|
||||
#[arg(long, default_value = "3600")]
|
||||
pub installed_extensions_collection_interval: u64,
|
||||
|
||||
/// Run in development mode, skipping VM-specific operations like process termination
|
||||
#[arg(long, action = clap::ArgAction::SetTrue)]
|
||||
pub dev: bool,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
@@ -159,7 +163,7 @@ fn main() -> Result<()> {
|
||||
.build()?;
|
||||
let _rt_guard = runtime.enter();
|
||||
|
||||
runtime.block_on(init())?;
|
||||
runtime.block_on(init(cli.dev))?;
|
||||
|
||||
// enable core dumping for all child processes
|
||||
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
|
||||
@@ -198,13 +202,13 @@ fn main() -> Result<()> {
|
||||
deinit_and_exit(exit_code);
|
||||
}
|
||||
|
||||
async fn init() -> Result<()> {
|
||||
async fn init(dev_mode: bool) -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
thread::spawn(move || {
|
||||
for sig in signals.forever() {
|
||||
handle_exit_signal(sig);
|
||||
handle_exit_signal(sig, dev_mode);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -263,9 +267,9 @@ fn deinit_and_exit(exit_code: Option<i32>) -> ! {
|
||||
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
|
||||
/// to prevent leakage. TODO: it is better to convert compute_ctl to async and
|
||||
/// wait for termination which would be easy then.
|
||||
fn handle_exit_signal(sig: i32) {
|
||||
fn handle_exit_signal(sig: i32, dev_mode: bool) {
|
||||
info!("received {sig} termination signal");
|
||||
forward_termination_signal();
|
||||
forward_termination_signal(dev_mode);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ use url::Url;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
use utils::pid_file;
|
||||
|
||||
use crate::configurator::launch_configurator;
|
||||
use crate::disk_quota::set_disk_quota;
|
||||
@@ -44,6 +45,7 @@ use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::metrics::COMPUTE_CTL_UP;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::pgbouncer::*;
|
||||
use crate::rsyslog::{
|
||||
PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export,
|
||||
launch_pgaudit_gc,
|
||||
@@ -161,6 +163,10 @@ pub struct ComputeState {
|
||||
pub lfc_prewarm_state: LfcPrewarmState,
|
||||
pub lfc_offload_state: LfcOffloadState,
|
||||
|
||||
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
|
||||
/// mode == ComputeMode::Primary. None otherwise
|
||||
pub terminate_flush_lsn: Option<Lsn>,
|
||||
|
||||
pub metrics: ComputeMetrics,
|
||||
}
|
||||
|
||||
@@ -176,6 +182,7 @@ impl ComputeState {
|
||||
metrics: ComputeMetrics::default(),
|
||||
lfc_prewarm_state: LfcPrewarmState::default(),
|
||||
lfc_offload_state: LfcOffloadState::default(),
|
||||
terminate_flush_lsn: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,6 +222,46 @@ pub struct ParsedSpec {
|
||||
pub endpoint_storage_token: Option<String>,
|
||||
}
|
||||
|
||||
impl ParsedSpec {
|
||||
pub fn validate(&self) -> Result<(), String> {
|
||||
// Only Primary nodes are using safekeeper_connstrings, and at the moment
|
||||
// this method only validates that part of the specs.
|
||||
if self.spec.mode != ComputeMode::Primary {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// While it seems like a good idea to check for an odd number of entries in
|
||||
// the safekeepers connection string, changes to the list of safekeepers might
|
||||
// incur appending a new server to a list of 3, in which case a list of 4
|
||||
// entries is okay in production.
|
||||
//
|
||||
// Still we want unique entries, and at least one entry in the vector
|
||||
if self.safekeeper_connstrings.is_empty() {
|
||||
return Err(String::from("safekeeper_connstrings is empty"));
|
||||
}
|
||||
|
||||
// check for uniqueness of the connection strings in the set
|
||||
let mut connstrings = self.safekeeper_connstrings.clone();
|
||||
|
||||
connstrings.sort();
|
||||
let mut previous = &connstrings[0];
|
||||
|
||||
for current in connstrings.iter().skip(1) {
|
||||
// duplicate entry?
|
||||
if current == previous {
|
||||
return Err(format!(
|
||||
"duplicate entry in safekeeper_connstrings: {}!",
|
||||
current,
|
||||
));
|
||||
}
|
||||
|
||||
previous = current;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
type Error = String;
|
||||
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
|
||||
@@ -244,6 +291,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
} else {
|
||||
spec.safekeeper_connstrings.clone()
|
||||
};
|
||||
|
||||
let storage_auth_token = spec.storage_auth_token.clone();
|
||||
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
|
||||
tenant_id
|
||||
@@ -278,7 +326,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
.clone()
|
||||
.or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token"));
|
||||
|
||||
Ok(ParsedSpec {
|
||||
let res = ParsedSpec {
|
||||
spec,
|
||||
pageserver_connstr,
|
||||
safekeeper_connstrings,
|
||||
@@ -287,7 +335,11 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
timeline_id,
|
||||
endpoint_storage_addr,
|
||||
endpoint_storage_token,
|
||||
})
|
||||
};
|
||||
|
||||
// Now check validity of the parsed specification
|
||||
res.validate()?;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -356,7 +408,9 @@ impl ComputeNode {
|
||||
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
let options = match conn_conf.get_options() {
|
||||
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
|
||||
// Allow the control plane to override any options set by the
|
||||
// compute
|
||||
Some(options) => format!("{} {}", EXTRA_OPTIONS, options),
|
||||
None => EXTRA_OPTIONS.to_string(),
|
||||
};
|
||||
conn_conf.options(&options);
|
||||
@@ -484,12 +538,21 @@ impl ComputeNode {
|
||||
// Reap the postgres process
|
||||
delay_exit |= this.cleanup_after_postgres_exit()?;
|
||||
|
||||
// /terminate returns LSN. If we don't sleep at all, connection will break and we
|
||||
// won't get result. If we sleep too much, tests will take significantly longer
|
||||
// and Github Action run will error out
|
||||
let sleep_duration = if delay_exit {
|
||||
Duration::from_secs(30)
|
||||
} else {
|
||||
Duration::from_millis(300)
|
||||
};
|
||||
|
||||
// If launch failed, keep serving HTTP requests for a while, so the cloud
|
||||
// control plane can get the actual error.
|
||||
if delay_exit {
|
||||
info!("giving control plane 30s to collect the error before shutdown");
|
||||
std::thread::sleep(Duration::from_secs(30));
|
||||
}
|
||||
std::thread::sleep(sleep_duration);
|
||||
Ok(exit_code)
|
||||
}
|
||||
|
||||
@@ -861,20 +924,25 @@ impl ComputeNode {
|
||||
// Maybe sync safekeepers again, to speed up next startup
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
|
||||
let lsn = if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
|
||||
info!("syncing safekeepers on shutdown");
|
||||
let storage_auth_token = pspec.storage_auth_token.clone();
|
||||
let lsn = self.sync_safekeepers(storage_auth_token)?;
|
||||
info!("synced safekeepers at lsn {lsn}");
|
||||
}
|
||||
info!(%lsn, "synced safekeepers");
|
||||
Some(lsn)
|
||||
} else {
|
||||
info!("not primary, not syncing safekeepers");
|
||||
None
|
||||
};
|
||||
|
||||
let mut delay_exit = false;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if state.status == ComputeStatus::TerminationPending {
|
||||
state.terminate_flush_lsn = lsn;
|
||||
if let ComputeStatus::TerminationPending { mode } = state.status {
|
||||
state.status = ComputeStatus::Terminated;
|
||||
self.state_changed.notify_all();
|
||||
// we were asked to terminate gracefully, don't exit to avoid restart
|
||||
delay_exit = true
|
||||
delay_exit = mode == compute_api::responses::TerminateMode::Fast
|
||||
}
|
||||
drop(state);
|
||||
|
||||
@@ -1745,7 +1813,7 @@ impl ComputeNode {
|
||||
|
||||
// exit loop
|
||||
ComputeStatus::Failed
|
||||
| ComputeStatus::TerminationPending
|
||||
| ComputeStatus::TerminationPending { .. }
|
||||
| ComputeStatus::Terminated => break 'cert_update,
|
||||
|
||||
// wait
|
||||
@@ -2246,12 +2314,68 @@ pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn forward_termination_signal() {
|
||||
pub fn forward_termination_signal(dev_mode: bool) {
|
||||
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
|
||||
if ss_pid != 0 {
|
||||
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
|
||||
kill(ss_pid, Signal::SIGTERM).ok();
|
||||
}
|
||||
|
||||
if !dev_mode {
|
||||
// Terminate pgbouncer with SIGKILL
|
||||
match pid_file::read(PGBOUNCER_PIDFILE.into()) {
|
||||
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
|
||||
info!("sending SIGKILL to pgbouncer process pid: {}", pid);
|
||||
if let Err(e) = kill(pid, Signal::SIGKILL) {
|
||||
error!("failed to terminate pgbouncer: {}", e);
|
||||
}
|
||||
}
|
||||
// pgbouncer does not lock the pid file, so we read and kill the process directly
|
||||
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
|
||||
if let Ok(pid_str) = std::fs::read_to_string(PGBOUNCER_PIDFILE) {
|
||||
if let Ok(pid) = pid_str.trim().parse::<i32>() {
|
||||
info!(
|
||||
"sending SIGKILL to pgbouncer process pid: {} (from unlocked pid file)",
|
||||
pid
|
||||
);
|
||||
if let Err(e) = kill(Pid::from_raw(pid), Signal::SIGKILL) {
|
||||
error!("failed to terminate pgbouncer: {}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("pgbouncer pid file exists but process not running");
|
||||
}
|
||||
}
|
||||
Ok(pid_file::PidFileRead::NotExist) => {
|
||||
info!("pgbouncer pid file not found, process may not be running");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error reading pgbouncer pid file: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Terminate local_proxy
|
||||
match pid_file::read("/etc/local_proxy/pid".into()) {
|
||||
Ok(pid_file::PidFileRead::LockedByOtherProcess(pid)) => {
|
||||
info!("sending SIGTERM to local_proxy process pid: {}", pid);
|
||||
if let Err(e) = kill(pid, Signal::SIGTERM) {
|
||||
error!("failed to terminate local_proxy: {}", e);
|
||||
}
|
||||
}
|
||||
Ok(pid_file::PidFileRead::NotHeldByAnyProcess(_)) => {
|
||||
info!("local_proxy PID file exists but process not running");
|
||||
}
|
||||
Ok(pid_file::PidFileRead::NotExist) => {
|
||||
info!("local_proxy PID file not found, process may not be running");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error reading local_proxy PID file: {}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("Skipping pgbouncer and local_proxy termination because in dev mode");
|
||||
}
|
||||
|
||||
let pg_pid = PG_PID.load(Ordering::SeqCst);
|
||||
if pg_pid != 0 {
|
||||
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
|
||||
@@ -2284,3 +2408,21 @@ impl<T: 'static> JoinSetExt<T> for tokio::task::JoinSet<T> {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fs::File;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn duplicate_safekeeper_connstring() {
|
||||
let file = File::open("tests/cluster_spec.json").unwrap();
|
||||
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
|
||||
|
||||
match ParsedSpec::try_from(spec.clone()) {
|
||||
Ok(_p) => panic!("Failed to detect duplicate entry"),
|
||||
Err(e) => assert!(e.starts_with("duplicate entry in safekeeper_connstrings:")),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,32 +1,42 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::compute::{ComputeNode, forward_termination_signal};
|
||||
use crate::http::JsonResponse;
|
||||
use axum::extract::State;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use axum::response::Response;
|
||||
use axum_extra::extract::OptionalQuery;
|
||||
use compute_api::responses::{ComputeStatus, TerminateResponse};
|
||||
use http::StatusCode;
|
||||
use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
use tokio::task;
|
||||
use tracing::info;
|
||||
|
||||
use crate::compute::{ComputeNode, forward_termination_signal};
|
||||
use crate::http::JsonResponse;
|
||||
#[derive(Deserialize, Default)]
|
||||
pub struct TerminateQuery {
|
||||
mode: compute_api::responses::TerminateMode,
|
||||
}
|
||||
|
||||
/// Terminate the compute.
|
||||
pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||
pub(in crate::http) async fn terminate(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
OptionalQuery(terminate): OptionalQuery<TerminateQuery>,
|
||||
) -> Response {
|
||||
let mode = terminate.unwrap_or_default().mode;
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status == ComputeStatus::Terminated {
|
||||
return StatusCode::CREATED.into_response();
|
||||
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn);
|
||||
}
|
||||
|
||||
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
|
||||
return JsonResponse::invalid_status(state.status);
|
||||
}
|
||||
|
||||
state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
|
||||
drop(state);
|
||||
state.set_status(
|
||||
ComputeStatus::TerminationPending { mode },
|
||||
&compute.state_changed,
|
||||
);
|
||||
}
|
||||
|
||||
forward_termination_signal();
|
||||
forward_termination_signal(false);
|
||||
info!("sent signal and notified waiters");
|
||||
|
||||
// Spawn a blocking thread to wait for compute to become Terminated.
|
||||
@@ -34,7 +44,7 @@ pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>)
|
||||
// be able to serve other requests while some particular request
|
||||
// is waiting for compute to finish configuration.
|
||||
let c = compute.clone();
|
||||
task::spawn_blocking(move || {
|
||||
let lsn = task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Terminated {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
@@ -44,11 +54,10 @@ pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>)
|
||||
state.status
|
||||
);
|
||||
}
|
||||
state.terminate_flush_lsn
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
info!("terminated Postgres");
|
||||
|
||||
StatusCode::OK.into_response()
|
||||
JsonResponse::success(StatusCode::OK, TerminateResponse { lsn })
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ mod migration;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
pub mod pgbouncer;
|
||||
pub mod rsyslog;
|
||||
pub mod spec;
|
||||
mod spec_apply;
|
||||
|
||||
@@ -83,7 +83,9 @@ impl ComputeMonitor {
|
||||
let compute_status = self.compute.get_status();
|
||||
if matches!(
|
||||
compute_status,
|
||||
ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed
|
||||
ComputeStatus::Terminated
|
||||
| ComputeStatus::TerminationPending { .. }
|
||||
| ComputeStatus::Failed
|
||||
) {
|
||||
info!(
|
||||
"compute is in {} status, stopping compute monitor",
|
||||
|
||||
1
compute_tools/src/pgbouncer.rs
Normal file
1
compute_tools/src/pgbouncer.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub const PGBOUNCER_PIDFILE: &str = "/tmp/pgbouncer.pid";
|
||||
6
compute_tools/tests/README.md
Normal file
6
compute_tools/tests/README.md
Normal file
@@ -0,0 +1,6 @@
|
||||
### Test files
|
||||
|
||||
The file `cluster_spec.json` has been copied over from libs/compute_api
|
||||
tests, with some edits:
|
||||
|
||||
- the neon.safekeepers setting contains a duplicate value
|
||||
245
compute_tools/tests/cluster_spec.json
Normal file
245
compute_tools/tests/cluster_spec.json
Normal file
@@ -0,0 +1,245 @@
|
||||
{
|
||||
"format_version": 1.0,
|
||||
|
||||
"timestamp": "2021-05-23T18:25:43.511Z",
|
||||
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
|
||||
|
||||
"cluster": {
|
||||
"cluster_id": "test-cluster-42",
|
||||
"name": "Zenith Test",
|
||||
"state": "restarted",
|
||||
"roles": [
|
||||
{
|
||||
"name": "postgres",
|
||||
"encrypted_password": "6b1d16b78004bbd51fa06af9eda75972",
|
||||
"options": null
|
||||
},
|
||||
{
|
||||
"name": "alexk",
|
||||
"encrypted_password": null,
|
||||
"options": null
|
||||
},
|
||||
{
|
||||
"name": "zenith \"new\"",
|
||||
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972",
|
||||
"options": null
|
||||
},
|
||||
{
|
||||
"name": "zen",
|
||||
"encrypted_password": "9b1d16b78004bbd51fa06af9eda75972"
|
||||
},
|
||||
{
|
||||
"name": "\"name\";\\n select 1;",
|
||||
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972"
|
||||
},
|
||||
{
|
||||
"name": "MyRole",
|
||||
"encrypted_password": "5b1d16b78004bbd51fa06af9eda75972"
|
||||
}
|
||||
],
|
||||
"databases": [
|
||||
{
|
||||
"name": "DB2",
|
||||
"owner": "alexk",
|
||||
"options": [
|
||||
{
|
||||
"name": "LC_COLLATE",
|
||||
"value": "C",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "LC_CTYPE",
|
||||
"value": "C",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "TEMPLATE",
|
||||
"value": "template0",
|
||||
"vartype": "enum"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "zenith",
|
||||
"owner": "MyRole"
|
||||
},
|
||||
{
|
||||
"name": "zen",
|
||||
"owner": "zen"
|
||||
}
|
||||
],
|
||||
"settings": [
|
||||
{
|
||||
"name": "fsync",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "logical",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "hot_standby",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "prewarm_lfc_on_startup",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501,127.0.0.1:6502",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "log_connections",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "shared_buffers",
|
||||
"value": "32768",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "port",
|
||||
"value": "55432",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_connections",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_senders",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "listen_addresses",
|
||||
"value": "0.0.0.0",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_sender_timeout",
|
||||
"value": "0",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "password_encryption",
|
||||
"value": "md5",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "maintenance_work_mem",
|
||||
"value": "65536",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_parallel_workers",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_worker_processes",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.tenant_id",
|
||||
"value": "b0554b632bd4d547a63b86c3630317e8",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_slots",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.timeline_id",
|
||||
"value": "2414a61ffc94e428f14b5758fe308e13",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "shared_preload_libraries",
|
||||
"value": "neon",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "synchronous_standby_names",
|
||||
"value": "walproposer",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.pageserver_connstring",
|
||||
"value": "host=127.0.0.1 port=6400",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "test.escaping",
|
||||
"value": "here's a backslash \\ and a quote ' and a double-quote \" hooray",
|
||||
"vartype": "string"
|
||||
}
|
||||
]
|
||||
},
|
||||
"delta_operations": [
|
||||
{
|
||||
"action": "delete_db",
|
||||
"name": "zenith_test"
|
||||
},
|
||||
{
|
||||
"action": "rename_db",
|
||||
"name": "DB",
|
||||
"new_name": "DB2"
|
||||
},
|
||||
{
|
||||
"action": "delete_role",
|
||||
"name": "zenith2"
|
||||
},
|
||||
{
|
||||
"action": "rename_role",
|
||||
"name": "zenith new",
|
||||
"new_name": "zenith \"new\""
|
||||
}
|
||||
],
|
||||
"remote_extensions": {
|
||||
"library_index": {
|
||||
"postgis-3": "postgis",
|
||||
"libpgrouting-3.4": "postgis",
|
||||
"postgis_raster-3": "postgis",
|
||||
"postgis_sfcgal-3": "postgis",
|
||||
"postgis_topology-3": "postgis",
|
||||
"address_standardizer-3": "postgis"
|
||||
},
|
||||
"extension_data": {
|
||||
"postgis": {
|
||||
"archive_path": "5834329303/v15/extensions/postgis.tar.zst",
|
||||
"control_data": {
|
||||
"postgis.control": "# postgis extension\ncomment = ''PostGIS geometry and geography spatial types and functions''\ndefault_version = ''3.3.2''\nmodule_pathname = ''$libdir/postgis-3''\nrelocatable = false\ntrusted = true\n",
|
||||
"pgrouting.control": "# pgRouting Extension\ncomment = ''pgRouting Extension''\ndefault_version = ''3.4.2''\nmodule_pathname = ''$libdir/libpgrouting-3.4''\nrelocatable = true\nrequires = ''plpgsql''\nrequires = ''postgis''\ntrusted = true\n",
|
||||
"postgis_raster.control": "# postgis_raster extension\ncomment = ''PostGIS raster types and functions''\ndefault_version = ''3.3.2''\nmodule_pathname = ''$libdir/postgis_raster-3''\nrelocatable = false\nrequires = postgis\ntrusted = true\n",
|
||||
"postgis_sfcgal.control": "# postgis topology extension\ncomment = ''PostGIS SFCGAL functions''\ndefault_version = ''3.3.2''\nrelocatable = true\nrequires = postgis\ntrusted = true\n",
|
||||
"postgis_topology.control": "# postgis topology extension\ncomment = ''PostGIS topology spatial types and functions''\ndefault_version = ''3.3.2''\nrelocatable = false\nschema = topology\nrequires = postgis\ntrusted = true\n",
|
||||
"address_standardizer.control": "# address_standardizer extension\ncomment = ''Used to parse an address into constituent elements. Generally used to support geocoding address normalization step.''\ndefault_version = ''3.3.2''\nrelocatable = true\ntrusted = true\n",
|
||||
"postgis_tiger_geocoder.control": "# postgis tiger geocoder extension\ncomment = ''PostGIS tiger geocoder and reverse geocoder''\ndefault_version = ''3.3.2''\nrelocatable = false\nschema = tiger\nrequires = ''postgis,fuzzystrmatch''\nsuperuser= false\ntrusted = true\n",
|
||||
"address_standardizer_data_us.control": "# address standardizer us dataset\ncomment = ''Address Standardizer US dataset example''\ndefault_version = ''3.3.2''\nrelocatable = true\ntrusted = true\n"
|
||||
}
|
||||
}
|
||||
},
|
||||
"custom_extensions": [],
|
||||
"public_extensions": ["postgis"]
|
||||
},
|
||||
"pgbouncer_settings": {
|
||||
"default_pool_size": "42",
|
||||
"pool_mode": "session"
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,7 @@ use clap::Parser;
|
||||
use compute_api::requests::ComputeClaimsScope;
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::broker::StorageBroker;
|
||||
use control_plane::endpoint::{ComputeControlPlane, PageserverProtocol};
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol};
|
||||
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::{
|
||||
@@ -672,6 +672,13 @@ struct EndpointStartCmdArgs {
|
||||
#[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
|
||||
#[arg(default_value = "90s")]
|
||||
start_timeout: Duration,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
help = "Run in development mode, skipping VM-specific operations like process termination",
|
||||
action = clap::ArgAction::SetTrue
|
||||
)]
|
||||
dev: bool,
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
@@ -704,10 +711,9 @@ struct EndpointStopCmdArgs {
|
||||
)]
|
||||
destroy: bool,
|
||||
|
||||
#[clap(long, help = "Postgres shutdown mode, passed to \"pg_ctl -m <mode>\"")]
|
||||
#[arg(value_parser(["smart", "fast", "immediate"]))]
|
||||
#[arg(default_value = "fast")]
|
||||
mode: String,
|
||||
#[clap(long, help = "Postgres shutdown mode")]
|
||||
#[clap(default_value = "fast")]
|
||||
mode: EndpointTerminateMode,
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
@@ -1590,6 +1596,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
stripe_size.0 as usize,
|
||||
args.create_test_user,
|
||||
args.start_timeout,
|
||||
args.dev,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -1650,7 +1657,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.endpoints
|
||||
.get(endpoint_id)
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
endpoint.stop(&args.mode, args.destroy)?;
|
||||
match endpoint.stop(args.mode, args.destroy).await?.lsn {
|
||||
Some(lsn) => println!("{lsn}"),
|
||||
None => println!("null"),
|
||||
}
|
||||
}
|
||||
EndpointCmd::GenerateJwt(args) => {
|
||||
let endpoint = {
|
||||
@@ -2082,11 +2092,16 @@ async fn handle_stop_all(args: &StopCmdArgs, env: &local_env::LocalEnv) -> Resul
|
||||
}
|
||||
|
||||
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
let mode = if immediate {
|
||||
EndpointTerminateMode::Immediate
|
||||
} else {
|
||||
EndpointTerminateMode::Fast
|
||||
};
|
||||
// Stop all endpoints
|
||||
match ComputeControlPlane::load(env.clone()) {
|
||||
Ok(cplane) => {
|
||||
for (_k, node) in cplane.endpoints {
|
||||
if let Err(e) = node.stop(if immediate { "immediate" } else { "fast" }, false) {
|
||||
if let Err(e) = node.stop(mode, false).await {
|
||||
eprintln!("postgres stop failed: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,8 @@ use compute_api::requests::{
|
||||
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
|
||||
};
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig,
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
|
||||
TlsConfig,
|
||||
};
|
||||
use compute_api::spec::{
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
|
||||
@@ -341,13 +342,33 @@ pub enum EndpointStatus {
|
||||
|
||||
impl Display for EndpointStatus {
|
||||
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
let s = match self {
|
||||
writer.write_str(match self {
|
||||
Self::Running => "running",
|
||||
Self::Stopped => "stopped",
|
||||
Self::Crashed => "crashed",
|
||||
Self::RunningNoPidfile => "running, no pidfile",
|
||||
};
|
||||
write!(writer, "{}", s)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Copy, clap::ValueEnum)]
|
||||
pub enum EndpointTerminateMode {
|
||||
#[default]
|
||||
/// Use pg_ctl stop -m fast
|
||||
Fast,
|
||||
/// Use pg_ctl stop -m immediate
|
||||
Immediate,
|
||||
/// Use /terminate?mode=immediate
|
||||
ImmediateTerminate,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EndpointTerminateMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(match &self {
|
||||
EndpointTerminateMode::Fast => "fast",
|
||||
EndpointTerminateMode::Immediate => "immediate",
|
||||
EndpointTerminateMode::ImmediateTerminate => "immediate-terminate",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -691,6 +712,7 @@ impl Endpoint {
|
||||
shard_stripe_size: usize,
|
||||
create_test_user: bool,
|
||||
start_timeout: Duration,
|
||||
dev: bool,
|
||||
) -> Result<()> {
|
||||
if self.status() == EndpointStatus::Running {
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
@@ -861,6 +883,10 @@ impl Endpoint {
|
||||
cmd.args(["--remote-ext-base-url", remote_ext_base_url]);
|
||||
}
|
||||
|
||||
if dev {
|
||||
cmd.arg("--dev");
|
||||
}
|
||||
|
||||
let child = cmd.spawn()?;
|
||||
// set up a scopeguard to kill & wait for the child in case we panic or bail below
|
||||
let child = scopeguard::guard(child, |mut child| {
|
||||
@@ -913,7 +939,7 @@ impl Endpoint {
|
||||
ComputeStatus::Empty
|
||||
| ComputeStatus::ConfigurationPending
|
||||
| ComputeStatus::Configuration
|
||||
| ComputeStatus::TerminationPending
|
||||
| ComputeStatus::TerminationPending { .. }
|
||||
| ComputeStatus::Terminated => {
|
||||
bail!("unexpected compute status: {:?}", state.status)
|
||||
}
|
||||
@@ -1035,8 +1061,27 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
|
||||
self.pg_ctl(&["-m", mode, "stop"], &None)?;
|
||||
pub async fn stop(
|
||||
&self,
|
||||
mode: EndpointTerminateMode,
|
||||
destroy: bool,
|
||||
) -> Result<TerminateResponse> {
|
||||
// pg_ctl stop is fast but doesn't allow us to collect LSN. /terminate is
|
||||
// slow, and test runs time out. Solution: special mode "immediate-terminate"
|
||||
// which uses /terminate
|
||||
let response = if let EndpointTerminateMode::ImmediateTerminate = mode {
|
||||
let ip = self.external_http_address.ip();
|
||||
let port = self.external_http_address.port();
|
||||
let url = format!("http://{ip}:{port}/terminate?mode=immediate");
|
||||
let token = self.generate_jwt(Some(ComputeClaimsScope::Admin))?;
|
||||
let request = reqwest::Client::new().post(url).bearer_auth(token);
|
||||
let response = request.send().await.context("/terminate")?;
|
||||
let text = response.text().await.context("/terminate result")?;
|
||||
serde_json::from_str(&text).with_context(|| format!("deserializing {text}"))?
|
||||
} else {
|
||||
self.pg_ctl(&["-m", &mode.to_string(), "stop"], &None)?;
|
||||
TerminateResponse { lsn: None }
|
||||
};
|
||||
|
||||
// Also wait for the compute_ctl process to die. It might have some
|
||||
// cleanup work to do after postgres stops, like syncing safekeepers,
|
||||
@@ -1046,7 +1091,7 @@ impl Endpoint {
|
||||
// waiting. Sometimes we do *not* want this cleanup: tests intentionally
|
||||
// do stop when majority of safekeepers is down, so sync-safekeepers
|
||||
// would hang otherwise. This could be a separate flag though.
|
||||
let send_sigterm = destroy || mode == "immediate";
|
||||
let send_sigterm = destroy || !matches!(mode, EndpointTerminateMode::Fast);
|
||||
self.wait_for_compute_ctl_to_exit(send_sigterm)?;
|
||||
if destroy {
|
||||
println!(
|
||||
@@ -1055,7 +1100,7 @@ impl Endpoint {
|
||||
);
|
||||
std::fs::remove_dir_all(self.endpoint_path())?;
|
||||
}
|
||||
Ok(())
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub fn connstr(&self, user: &str, db_name: &str) -> String {
|
||||
|
||||
@@ -209,6 +209,8 @@ pub struct NeonStorageControllerConf {
|
||||
pub use_https_safekeeper_api: bool,
|
||||
|
||||
pub use_local_compute_notifications: bool,
|
||||
|
||||
pub timeline_safekeeper_count: Option<i64>,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -236,9 +238,10 @@ impl Default for NeonStorageControllerConf {
|
||||
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
|
||||
long_reconcile_threshold: None,
|
||||
use_https_pageserver_api: false,
|
||||
timelines_onto_safekeepers: false,
|
||||
timelines_onto_safekeepers: true,
|
||||
use_https_safekeeper_api: false,
|
||||
use_local_compute_notifications: true,
|
||||
timeline_safekeeper_count: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -628,6 +628,10 @@ impl StorageController {
|
||||
args.push("--timelines-onto-safekeepers".to_string());
|
||||
}
|
||||
|
||||
if let Some(sk_cnt) = self.config.timeline_safekeeper_count {
|
||||
args.push(format!("--timeline-safekeeper-count={sk_cnt}"));
|
||||
}
|
||||
|
||||
println!("Starting storage controller");
|
||||
|
||||
background_process::start_process(
|
||||
|
||||
@@ -95,3 +95,4 @@ echo "Start compute node"
|
||||
-b /usr/local/bin/postgres \
|
||||
--compute-id "compute-${RANDOM}" \
|
||||
--config "${CONFIG_FILE}"
|
||||
--dev
|
||||
|
||||
@@ -31,13 +31,12 @@ struct Args {
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
struct Config {
|
||||
#[serde(default = "listen")]
|
||||
listen: std::net::SocketAddr,
|
||||
pemfile: camino::Utf8PathBuf,
|
||||
#[serde(flatten)]
|
||||
storage_config: remote_storage::RemoteStorageConfig,
|
||||
storage_kind: remote_storage::TypedRemoteStorageKind,
|
||||
#[serde(default = "max_upload_file_limit")]
|
||||
max_upload_file_limit: usize,
|
||||
}
|
||||
@@ -70,7 +69,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
|
||||
info!("listening on {}", listener.local_addr().unwrap());
|
||||
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
|
||||
let storage =
|
||||
remote_storage::GenericRemoteStorage::from_storage_kind(config.storage_kind).await?;
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
if !args.no_s3_check_on_startup {
|
||||
app::check_storage_permissions(&storage, cancel.clone()).await?;
|
||||
|
||||
@@ -16,6 +16,7 @@ pub static COMPUTE_AUDIENCE: &str = "compute";
|
||||
pub enum ComputeClaimsScope {
|
||||
/// An admin-scoped token allows access to all of `compute_ctl`'s authorized
|
||||
/// facilities.
|
||||
#[serde(rename = "compute_ctl:admin")]
|
||||
Admin,
|
||||
}
|
||||
|
||||
@@ -24,7 +25,7 @@ impl FromStr for ComputeClaimsScope {
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"admin" => Ok(ComputeClaimsScope::Admin),
|
||||
"compute_ctl:admin" => Ok(ComputeClaimsScope::Admin),
|
||||
_ => Err(anyhow::anyhow!("invalid compute claims scope \"{s}\"")),
|
||||
}
|
||||
}
|
||||
@@ -80,3 +81,23 @@ pub struct SetRoleGrantsRequest {
|
||||
pub privileges: Vec<Privilege>,
|
||||
pub role: PgIdent,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::requests::ComputeClaimsScope;
|
||||
|
||||
/// Confirm that whether we parse the scope by string or through serde, the
|
||||
/// same values parse to the same enum variant.
|
||||
#[test]
|
||||
fn compute_request_scopes() {
|
||||
const ADMIN_SCOPE: &str = "compute_ctl:admin";
|
||||
|
||||
let from_serde: ComputeClaimsScope =
|
||||
serde_json::from_str(&format!("\"{ADMIN_SCOPE}\"")).unwrap();
|
||||
let from_str = ComputeClaimsScope::from_str(ADMIN_SCOPE).unwrap();
|
||||
|
||||
assert_eq!(from_serde, from_str);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,6 +83,16 @@ pub struct ComputeStatusResponse {
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TerminateMode {
|
||||
#[default]
|
||||
/// wait 30s till returning from /terminate to allow control plane to get the error
|
||||
Fast,
|
||||
/// return from /terminate immediately as soon as all components are terminated
|
||||
Immediate,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ComputeStatus {
|
||||
@@ -103,11 +113,16 @@ pub enum ComputeStatus {
|
||||
// control-plane to terminate it.
|
||||
Failed,
|
||||
// Termination requested
|
||||
TerminationPending,
|
||||
TerminationPending { mode: TerminateMode },
|
||||
// Terminated Postgres
|
||||
Terminated,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
pub struct TerminateResponse {
|
||||
pub lsn: Option<utils::lsn::Lsn>,
|
||||
}
|
||||
|
||||
impl Display for ComputeStatus {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
@@ -117,7 +132,7 @@ impl Display for ComputeStatus {
|
||||
ComputeStatus::Running => f.write_str("running"),
|
||||
ComputeStatus::Configuration => f.write_str("configuration"),
|
||||
ComputeStatus::Failed => f.write_str("failed"),
|
||||
ComputeStatus::TerminationPending => f.write_str("termination-pending"),
|
||||
ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"),
|
||||
ComputeStatus::Terminated => f.write_str("terminated"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -419,13 +419,13 @@ pub fn now() -> u64 {
|
||||
with_thread_context(|ctx| ctx.clock.get().unwrap().now())
|
||||
}
|
||||
|
||||
pub fn exit(code: i32, msg: String) {
|
||||
pub fn exit(code: i32, msg: String) -> ! {
|
||||
with_thread_context(|ctx| {
|
||||
ctx.allow_panic.store(true, Ordering::SeqCst);
|
||||
let mut result = ctx.result.lock();
|
||||
*result = (code, msg);
|
||||
panic!("exit");
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn get_thread_ctx() -> Arc<ThreadContext> {
|
||||
|
||||
@@ -17,7 +17,7 @@ anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
byteorder.workspace = true
|
||||
utils.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_ffi_types.workspace = true
|
||||
enum-map.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
@@ -816,7 +816,7 @@ pub mod tenant_conf_defaults {
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
pub const DEFAULT_GC_COMPACTION_ENABLED: bool = false;
|
||||
pub const DEFAULT_GC_COMPACTION_ENABLED: bool = true;
|
||||
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
|
||||
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
|
||||
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::ops::Range;
|
||||
use anyhow::{Result, bail};
|
||||
use byteorder::{BE, ByteOrder};
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::{Oid, RepOriginId};
|
||||
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi_types::{Oid, RepOriginId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::const_assert;
|
||||
|
||||
@@ -194,7 +194,7 @@ impl Key {
|
||||
/// will be rejected on the write path.
|
||||
#[allow(dead_code)]
|
||||
pub fn is_valid_key_on_write_path_strong(&self) -> bool {
|
||||
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
if !self.is_i128_representable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use itertools::Itertools;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
use crate::key::Key;
|
||||
use crate::shard::{ShardCount, ShardIdentity};
|
||||
@@ -269,9 +268,13 @@ impl KeySpace {
|
||||
/// Partition a key space into roughly chunks of roughly 'target_size' bytes
|
||||
/// in each partition.
|
||||
///
|
||||
pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
|
||||
// Assume that each value is 8k in size.
|
||||
let target_nblocks = (target_size / BLCKSZ as u64) as u32;
|
||||
pub fn partition(
|
||||
&self,
|
||||
shard_identity: &ShardIdentity,
|
||||
target_size: u64,
|
||||
block_size: u64,
|
||||
) -> KeyPartitioning {
|
||||
let target_nblocks = (target_size / block_size) as u32;
|
||||
|
||||
let mut parts = Vec::new();
|
||||
let mut current_part = Vec::new();
|
||||
|
||||
@@ -5,11 +5,10 @@ pub mod controller_api;
|
||||
pub mod key;
|
||||
pub mod keyspace;
|
||||
pub mod models;
|
||||
pub mod record;
|
||||
pub mod pagestream_api;
|
||||
pub mod reltag;
|
||||
pub mod shard;
|
||||
/// Public API types
|
||||
pub mod upcall_api;
|
||||
pub mod value;
|
||||
|
||||
pub mod config;
|
||||
|
||||
@@ -5,16 +5,12 @@ pub mod utilization;
|
||||
use core::ops::Range;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::io::{BufRead, Read};
|
||||
use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
#[cfg(feature = "testing")]
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use serde_with::serde_as;
|
||||
pub use utilization::PageserverUtilization;
|
||||
@@ -24,7 +20,6 @@ use utils::{completion, serde_system_time};
|
||||
|
||||
use crate::config::Ratio;
|
||||
use crate::key::{CompactKey, Key};
|
||||
use crate::reltag::RelTag;
|
||||
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
|
||||
|
||||
/// The state of a tenant in this pageserver.
|
||||
@@ -1907,219 +1902,6 @@ pub struct ScanDisposableKeysResponse {
|
||||
pub not_disposable_count: usize,
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
#[cfg(feature = "testing")]
|
||||
Test(PagestreamTestRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(Debug, strum_macros::EnumProperty)]
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
#[cfg(feature = "testing")]
|
||||
Test(PagestreamTestResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
#[repr(u8)]
|
||||
enum PagestreamFeMessageTag {
|
||||
Exists = 0,
|
||||
Nblocks = 1,
|
||||
GetPage = 2,
|
||||
DbSize = 3,
|
||||
GetSlruSegment = 4,
|
||||
/* future tags above this line */
|
||||
/// For testing purposes, not available in production.
|
||||
#[cfg(feature = "testing")]
|
||||
Test = 99,
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
#[repr(u8)]
|
||||
enum PagestreamBeMessageTag {
|
||||
Exists = 100,
|
||||
Nblocks = 101,
|
||||
GetPage = 102,
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
/* future tags above this line */
|
||||
/// For testing purposes, not available in production.
|
||||
#[cfg(feature = "testing")]
|
||||
Test = 199,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PagestreamFeMessageTag {
|
||||
type Error = u8;
|
||||
fn try_from(value: u8) -> Result<Self, u8> {
|
||||
match value {
|
||||
0 => Ok(PagestreamFeMessageTag::Exists),
|
||||
1 => Ok(PagestreamFeMessageTag::Nblocks),
|
||||
2 => Ok(PagestreamFeMessageTag::GetPage),
|
||||
3 => Ok(PagestreamFeMessageTag::DbSize),
|
||||
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
|
||||
#[cfg(feature = "testing")]
|
||||
99 => Ok(PagestreamFeMessageTag::Test),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
fn try_from(value: u8) -> Result<Self, u8> {
|
||||
match value {
|
||||
100 => Ok(PagestreamBeMessageTag::Exists),
|
||||
101 => Ok(PagestreamBeMessageTag::Nblocks),
|
||||
102 => Ok(PagestreamBeMessageTag::GetPage),
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
#[cfg(feature = "testing")]
|
||||
199 => Ok(PagestreamBeMessageTag::Test),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A GetPage request contains two LSN values:
|
||||
//
|
||||
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
|
||||
// "get the latest version present". It's used by the primary server, which knows that no one else
|
||||
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
|
||||
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
|
||||
//
|
||||
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
|
||||
// modified between 'not_modified_since' and the request LSN. It's always correct to set
|
||||
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
|
||||
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
|
||||
// request without waiting for 'request_lsn' to arrive.
|
||||
//
|
||||
// The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
|
||||
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
|
||||
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
|
||||
// standby to request a page at a particular non-latest LSN, and also include the
|
||||
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
|
||||
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
|
||||
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
|
||||
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
|
||||
// interface allows sending both LSNs, and let the pageserver do the right thing. There was no
|
||||
// difference in the responses between V1 and V2.
|
||||
//
|
||||
// V3 version of protocol adds request ID to all requests. This request ID is also included in response
|
||||
// as well as other fields from requests, which allows to verify that we receive response for our request.
|
||||
// We copy fields from request to response to make checking more reliable: request ID is formed from process ID
|
||||
// and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
|
||||
//
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub enum PagestreamProtocolVersion {
|
||||
V2,
|
||||
V3,
|
||||
}
|
||||
|
||||
pub type RequestId = u64;
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamRequest {
|
||||
pub reqid: RequestId,
|
||||
pub request_lsn: Lsn,
|
||||
pub not_modified_since: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamExistsRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamNblocksRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamDbSizeRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub kind: u8,
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub req: PagestreamExistsRequest,
|
||||
pub exists: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamNblocksResponse {
|
||||
pub req: PagestreamNblocksRequest,
|
||||
pub n_blocks: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetPageResponse {
|
||||
pub req: PagestreamGetPageRequest,
|
||||
pub page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub req: PagestreamGetSlruSegmentRequest,
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub req: PagestreamRequest,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamDbSizeResponse {
|
||||
pub req: PagestreamDbSizeRequest,
|
||||
pub db_size: i64,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct PagestreamTestRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub batch_key: u64,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamTestResponse {
|
||||
pub req: PagestreamTestRequest,
|
||||
}
|
||||
|
||||
// This is a cut-down version of TenantHistorySize from the pageserver crate, omitting fields
|
||||
// that require pageserver-internal types. It is sufficient to get the total size.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -2131,506 +1913,6 @@ pub struct TenantHistorySize {
|
||||
pub size: Option<u64>,
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
/// Serialize a compute -> pageserver message. This is currently only used in testing
|
||||
/// tools. Always uses protocol version 3.
|
||||
pub fn serialize(&self) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
}
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u64(req.batch_key);
|
||||
let message = req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn parse<R: std::io::Read>(
|
||||
body: &mut R,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||
//
|
||||
// TODO: consider using protobuf or serde bincode for less error prone
|
||||
// serialization.
|
||||
let msg_tag = body.read_u8()?;
|
||||
let (reqid, request_lsn, not_modified_since) = match protocol_version {
|
||||
PagestreamProtocolVersion::V2 => (
|
||||
0,
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
),
|
||||
PagestreamProtocolVersion::V3 => (
|
||||
body.read_u64::<BigEndian>()?,
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
),
|
||||
};
|
||||
|
||||
match PagestreamFeMessageTag::try_from(msg_tag)
|
||||
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
|
||||
{
|
||||
PagestreamFeMessageTag::Exists => {
|
||||
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::Nblocks => {
|
||||
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::GetPage => {
|
||||
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::DbSize => {
|
||||
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
kind: body.read_u8()?,
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
#[cfg(feature = "testing")]
|
||||
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
batch_key: body.read_u64::<BigEndian>()?,
|
||||
message: {
|
||||
let len = body.read_u64::<BigEndian>()?;
|
||||
let mut buf = vec![0; len as usize];
|
||||
body.read_exact(&mut buf)?;
|
||||
String::from_utf8(buf)?
|
||||
},
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PagestreamBeMessage {
|
||||
pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
use PagestreamBeMessageTag as Tag;
|
||||
match protocol_version {
|
||||
PagestreamProtocolVersion::V2 => {
|
||||
match self {
|
||||
Self::Exists(resp) => {
|
||||
bytes.put_u8(Tag::Exists as u8);
|
||||
bytes.put_u8(resp.exists as u8);
|
||||
}
|
||||
|
||||
Self::Nblocks(resp) => {
|
||||
bytes.put_u8(Tag::Nblocks as u8);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
}
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(Tag::GetPage as u8);
|
||||
bytes.put(&resp.page[..])
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
bytes.put_u8(Tag::Error as u8);
|
||||
bytes.put(resp.message.as_bytes());
|
||||
bytes.put_u8(0); // null terminator
|
||||
}
|
||||
Self::DbSize(resp) => {
|
||||
bytes.put_u8(Tag::DbSize as u8);
|
||||
bytes.put_i64(resp.db_size);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(resp) => {
|
||||
bytes.put_u8(Tag::GetSlruSegment as u8);
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(resp) => {
|
||||
bytes.put_u8(Tag::Test as u8);
|
||||
bytes.put_u64(resp.req.batch_key);
|
||||
let message = resp.req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
PagestreamProtocolVersion::V3 => {
|
||||
match self {
|
||||
Self::Exists(resp) => {
|
||||
bytes.put_u8(Tag::Exists as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.rel.spcnode);
|
||||
bytes.put_u32(resp.req.rel.dbnode);
|
||||
bytes.put_u32(resp.req.rel.relnode);
|
||||
bytes.put_u8(resp.req.rel.forknum);
|
||||
bytes.put_u8(resp.exists as u8);
|
||||
}
|
||||
|
||||
Self::Nblocks(resp) => {
|
||||
bytes.put_u8(Tag::Nblocks as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.rel.spcnode);
|
||||
bytes.put_u32(resp.req.rel.dbnode);
|
||||
bytes.put_u32(resp.req.rel.relnode);
|
||||
bytes.put_u8(resp.req.rel.forknum);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
}
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(Tag::GetPage as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.rel.spcnode);
|
||||
bytes.put_u32(resp.req.rel.dbnode);
|
||||
bytes.put_u32(resp.req.rel.relnode);
|
||||
bytes.put_u8(resp.req.rel.forknum);
|
||||
bytes.put_u32(resp.req.blkno);
|
||||
bytes.put(&resp.page[..])
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
bytes.put_u8(Tag::Error as u8);
|
||||
bytes.put_u64(resp.req.reqid);
|
||||
bytes.put_u64(resp.req.request_lsn.0);
|
||||
bytes.put_u64(resp.req.not_modified_since.0);
|
||||
bytes.put(resp.message.as_bytes());
|
||||
bytes.put_u8(0); // null terminator
|
||||
}
|
||||
Self::DbSize(resp) => {
|
||||
bytes.put_u8(Tag::DbSize as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.dbnode);
|
||||
bytes.put_i64(resp.db_size);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(resp) => {
|
||||
bytes.put_u8(Tag::GetSlruSegment as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u8(resp.req.kind);
|
||||
bytes.put_u32(resp.req.segno);
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(resp) => {
|
||||
bytes.put_u8(Tag::Test as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u64(resp.req.batch_key);
|
||||
let message = resp.req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
|
||||
let mut buf = buf.reader();
|
||||
let msg_tag = buf.read_u8()?;
|
||||
|
||||
use PagestreamBeMessageTag as Tag;
|
||||
let ok =
|
||||
match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
|
||||
Tag::Exists => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let exists = buf.read_u8()? != 0;
|
||||
Self::Exists(PagestreamExistsResponse {
|
||||
req: PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel,
|
||||
},
|
||||
exists,
|
||||
})
|
||||
}
|
||||
Tag::Nblocks => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let n_blocks = buf.read_u32::<BigEndian>()?;
|
||||
Self::Nblocks(PagestreamNblocksResponse {
|
||||
req: PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel,
|
||||
},
|
||||
n_blocks,
|
||||
})
|
||||
}
|
||||
Tag::GetPage => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let blkno = buf.read_u32::<BigEndian>()?;
|
||||
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
|
||||
buf.read_exact(&mut page)?;
|
||||
Self::GetPage(PagestreamGetPageResponse {
|
||||
req: PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel,
|
||||
blkno,
|
||||
},
|
||||
page: page.into(),
|
||||
})
|
||||
}
|
||||
Tag::Error => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let mut msg = Vec::new();
|
||||
buf.read_until(0, &mut msg)?;
|
||||
let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
|
||||
let rust_str = cstring.to_str()?;
|
||||
Self::Error(PagestreamErrorResponse {
|
||||
req: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
message: rust_str.to_owned(),
|
||||
})
|
||||
}
|
||||
Tag::DbSize => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let dbnode = buf.read_u32::<BigEndian>()?;
|
||||
let db_size = buf.read_i64::<BigEndian>()?;
|
||||
Self::DbSize(PagestreamDbSizeResponse {
|
||||
req: PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
dbnode,
|
||||
},
|
||||
db_size,
|
||||
})
|
||||
}
|
||||
Tag::GetSlruSegment => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let kind = buf.read_u8()?;
|
||||
let segno = buf.read_u32::<BigEndian>()?;
|
||||
let n_blocks = buf.read_u32::<BigEndian>()?;
|
||||
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
|
||||
buf.read_exact(&mut segment)?;
|
||||
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
|
||||
req: PagestreamGetSlruSegmentRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
kind,
|
||||
segno,
|
||||
},
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Tag::Test => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let batch_key = buf.read_u64::<BigEndian>()?;
|
||||
let len = buf.read_u64::<BigEndian>()?;
|
||||
let mut msg = vec![0; len as usize];
|
||||
buf.read_exact(&mut msg)?;
|
||||
let message = String::from_utf8(msg)?;
|
||||
Self::Test(PagestreamTestResponse {
|
||||
req: PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
batch_key,
|
||||
message,
|
||||
},
|
||||
})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
anyhow::bail!(
|
||||
"remaining bytes in msg with tag={msg_tag}: {}",
|
||||
remaining.len()
|
||||
);
|
||||
}
|
||||
Ok(ok)
|
||||
}
|
||||
|
||||
pub fn kind(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Exists(_) => "Exists",
|
||||
Self::Nblocks(_) => "Nblocks",
|
||||
Self::GetPage(_) => "GetPage",
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(_) => "Test",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PageTraceEvent {
|
||||
pub key: CompactKey,
|
||||
@@ -2656,68 +1938,6 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_pagestream() {
|
||||
// Test serialization/deserialization of PagestreamFeMessage
|
||||
let messages = vec![
|
||||
PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
},
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
dbnode: 3,
|
||||
relnode: 4,
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(4),
|
||||
},
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
dbnode: 3,
|
||||
relnode: 4,
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
},
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
dbnode: 3,
|
||||
relnode: 4,
|
||||
},
|
||||
blkno: 7,
|
||||
}),
|
||||
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
},
|
||||
dbnode: 7,
|
||||
}),
|
||||
];
|
||||
for msg in messages {
|
||||
let bytes = msg.serialize();
|
||||
let reconstructed =
|
||||
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
|
||||
.unwrap();
|
||||
assert!(msg == reconstructed);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tenantinfo_serde() {
|
||||
// Test serialization/deserialization of TenantInfo
|
||||
|
||||
798
libs/pageserver_api/src/pagestream_api.rs
Normal file
798
libs/pageserver_api/src/pagestream_api.rs
Normal file
@@ -0,0 +1,798 @@
|
||||
//! Rust definitions of the libpq-based pagestream API
|
||||
//!
|
||||
//! See also the C implementation of the same API in pgxn/neon/pagestore_client.h
|
||||
|
||||
use std::io::{BufRead, Read};
|
||||
|
||||
use crate::reltag::RelTag;
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Block size.
|
||||
///
|
||||
/// XXX: We assume 8k block size in the SLRU fetch API. It's not great to hardcode
|
||||
/// that in the protocol, because Postgres supports different block sizes as a compile
|
||||
/// time option.
|
||||
const BLCKSZ: usize = 8192;
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
#[cfg(feature = "testing")]
|
||||
Test(PagestreamTestRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(Debug, strum_macros::EnumProperty)]
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
#[cfg(feature = "testing")]
|
||||
Test(PagestreamTestResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
#[repr(u8)]
|
||||
enum PagestreamFeMessageTag {
|
||||
Exists = 0,
|
||||
Nblocks = 1,
|
||||
GetPage = 2,
|
||||
DbSize = 3,
|
||||
GetSlruSegment = 4,
|
||||
/* future tags above this line */
|
||||
/// For testing purposes, not available in production.
|
||||
#[cfg(feature = "testing")]
|
||||
Test = 99,
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
#[repr(u8)]
|
||||
enum PagestreamBeMessageTag {
|
||||
Exists = 100,
|
||||
Nblocks = 101,
|
||||
GetPage = 102,
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
/* future tags above this line */
|
||||
/// For testing purposes, not available in production.
|
||||
#[cfg(feature = "testing")]
|
||||
Test = 199,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PagestreamFeMessageTag {
|
||||
type Error = u8;
|
||||
fn try_from(value: u8) -> Result<Self, u8> {
|
||||
match value {
|
||||
0 => Ok(PagestreamFeMessageTag::Exists),
|
||||
1 => Ok(PagestreamFeMessageTag::Nblocks),
|
||||
2 => Ok(PagestreamFeMessageTag::GetPage),
|
||||
3 => Ok(PagestreamFeMessageTag::DbSize),
|
||||
4 => Ok(PagestreamFeMessageTag::GetSlruSegment),
|
||||
#[cfg(feature = "testing")]
|
||||
99 => Ok(PagestreamFeMessageTag::Test),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
fn try_from(value: u8) -> Result<Self, u8> {
|
||||
match value {
|
||||
100 => Ok(PagestreamBeMessageTag::Exists),
|
||||
101 => Ok(PagestreamBeMessageTag::Nblocks),
|
||||
102 => Ok(PagestreamBeMessageTag::GetPage),
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
#[cfg(feature = "testing")]
|
||||
199 => Ok(PagestreamBeMessageTag::Test),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A GetPage request contains two LSN values:
|
||||
//
|
||||
// request_lsn: Get the page version at this point in time. Lsn::Max is a special value that means
|
||||
// "get the latest version present". It's used by the primary server, which knows that no one else
|
||||
// is writing WAL. 'not_modified_since' must be set to a proper value even if request_lsn is
|
||||
// Lsn::Max. Standby servers use the current replay LSN as the request LSN.
|
||||
//
|
||||
// not_modified_since: Hint to the pageserver that the client knows that the page has not been
|
||||
// modified between 'not_modified_since' and the request LSN. It's always correct to set
|
||||
// 'not_modified_since equal' to 'request_lsn' (unless Lsn::Max is used as the 'request_lsn'), but
|
||||
// passing an earlier LSN can speed up the request, by allowing the pageserver to process the
|
||||
// request without waiting for 'request_lsn' to arrive.
|
||||
//
|
||||
// The now-defunct V1 interface contained only one LSN, and a boolean 'latest' flag. The V1 interface was
|
||||
// sufficient for the primary; the 'lsn' was equivalent to the 'not_modified_since' value, and
|
||||
// 'latest' was set to true. The V2 interface was added because there was no correct way for a
|
||||
// standby to request a page at a particular non-latest LSN, and also include the
|
||||
// 'not_modified_since' hint. That led to an awkward choice of either using an old LSN in the
|
||||
// request, if the standby knows that the page hasn't been modified since, and risk getting an error
|
||||
// if that LSN has fallen behind the GC horizon, or requesting the current replay LSN, which could
|
||||
// require the pageserver unnecessarily to wait for the WAL to arrive up to that point. The new V2
|
||||
// interface allows sending both LSNs, and let the pageserver do the right thing. There was no
|
||||
// difference in the responses between V1 and V2.
|
||||
//
|
||||
// V3 version of protocol adds request ID to all requests. This request ID is also included in response
|
||||
// as well as other fields from requests, which allows to verify that we receive response for our request.
|
||||
// We copy fields from request to response to make checking more reliable: request ID is formed from process ID
|
||||
// and local counter, so in principle there can be duplicated requests IDs if process PID is reused.
|
||||
//
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub enum PagestreamProtocolVersion {
|
||||
V2,
|
||||
V3,
|
||||
}
|
||||
|
||||
pub type RequestId = u64;
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamRequest {
|
||||
pub reqid: RequestId,
|
||||
pub request_lsn: Lsn,
|
||||
pub not_modified_since: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamExistsRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamNblocksRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamDbSizeRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub kind: u8,
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub req: PagestreamExistsRequest,
|
||||
pub exists: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamNblocksResponse {
|
||||
pub req: PagestreamNblocksRequest,
|
||||
pub n_blocks: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetPageResponse {
|
||||
pub req: PagestreamGetPageRequest,
|
||||
pub page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub req: PagestreamGetSlruSegmentRequest,
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub req: PagestreamRequest,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamDbSizeResponse {
|
||||
pub req: PagestreamDbSizeRequest,
|
||||
pub db_size: i64,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct PagestreamTestRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub batch_key: u64,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamTestResponse {
|
||||
pub req: PagestreamTestRequest,
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
/// Serialize a compute -> pageserver message. This is currently only used in testing
|
||||
/// tools. Always uses protocol version 3.
|
||||
pub fn serialize(&self) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
}
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
}
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(req) => {
|
||||
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
|
||||
bytes.put_u64(req.hdr.reqid);
|
||||
bytes.put_u64(req.hdr.request_lsn.0);
|
||||
bytes.put_u64(req.hdr.not_modified_since.0);
|
||||
bytes.put_u64(req.batch_key);
|
||||
let message = req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn parse<R: std::io::Read>(
|
||||
body: &mut R,
|
||||
protocol_version: PagestreamProtocolVersion,
|
||||
) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// these correspond to the NeonMessageTag enum in pagestore_client.h
|
||||
//
|
||||
// TODO: consider using protobuf or serde bincode for less error prone
|
||||
// serialization.
|
||||
let msg_tag = body.read_u8()?;
|
||||
let (reqid, request_lsn, not_modified_since) = match protocol_version {
|
||||
PagestreamProtocolVersion::V2 => (
|
||||
0,
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
),
|
||||
PagestreamProtocolVersion::V3 => (
|
||||
body.read_u64::<BigEndian>()?,
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
),
|
||||
};
|
||||
|
||||
match PagestreamFeMessageTag::try_from(msg_tag)
|
||||
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
|
||||
{
|
||||
PagestreamFeMessageTag::Exists => {
|
||||
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::Nblocks => {
|
||||
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::GetPage => {
|
||||
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::DbSize => {
|
||||
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
}))
|
||||
}
|
||||
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
kind: body.read_u8()?,
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
#[cfg(feature = "testing")]
|
||||
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
batch_key: body.read_u64::<BigEndian>()?,
|
||||
message: {
|
||||
let len = body.read_u64::<BigEndian>()?;
|
||||
let mut buf = vec![0; len as usize];
|
||||
body.read_exact(&mut buf)?;
|
||||
String::from_utf8(buf)?
|
||||
},
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PagestreamBeMessage {
|
||||
pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
use PagestreamBeMessageTag as Tag;
|
||||
match protocol_version {
|
||||
PagestreamProtocolVersion::V2 => {
|
||||
match self {
|
||||
Self::Exists(resp) => {
|
||||
bytes.put_u8(Tag::Exists as u8);
|
||||
bytes.put_u8(resp.exists as u8);
|
||||
}
|
||||
|
||||
Self::Nblocks(resp) => {
|
||||
bytes.put_u8(Tag::Nblocks as u8);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
}
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(Tag::GetPage as u8);
|
||||
bytes.put(&resp.page[..])
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
bytes.put_u8(Tag::Error as u8);
|
||||
bytes.put(resp.message.as_bytes());
|
||||
bytes.put_u8(0); // null terminator
|
||||
}
|
||||
Self::DbSize(resp) => {
|
||||
bytes.put_u8(Tag::DbSize as u8);
|
||||
bytes.put_i64(resp.db_size);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(resp) => {
|
||||
bytes.put_u8(Tag::GetSlruSegment as u8);
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(resp) => {
|
||||
bytes.put_u8(Tag::Test as u8);
|
||||
bytes.put_u64(resp.req.batch_key);
|
||||
let message = resp.req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
PagestreamProtocolVersion::V3 => {
|
||||
match self {
|
||||
Self::Exists(resp) => {
|
||||
bytes.put_u8(Tag::Exists as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.rel.spcnode);
|
||||
bytes.put_u32(resp.req.rel.dbnode);
|
||||
bytes.put_u32(resp.req.rel.relnode);
|
||||
bytes.put_u8(resp.req.rel.forknum);
|
||||
bytes.put_u8(resp.exists as u8);
|
||||
}
|
||||
|
||||
Self::Nblocks(resp) => {
|
||||
bytes.put_u8(Tag::Nblocks as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.rel.spcnode);
|
||||
bytes.put_u32(resp.req.rel.dbnode);
|
||||
bytes.put_u32(resp.req.rel.relnode);
|
||||
bytes.put_u8(resp.req.rel.forknum);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
}
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(Tag::GetPage as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.rel.spcnode);
|
||||
bytes.put_u32(resp.req.rel.dbnode);
|
||||
bytes.put_u32(resp.req.rel.relnode);
|
||||
bytes.put_u8(resp.req.rel.forknum);
|
||||
bytes.put_u32(resp.req.blkno);
|
||||
bytes.put(&resp.page[..])
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
bytes.put_u8(Tag::Error as u8);
|
||||
bytes.put_u64(resp.req.reqid);
|
||||
bytes.put_u64(resp.req.request_lsn.0);
|
||||
bytes.put_u64(resp.req.not_modified_since.0);
|
||||
bytes.put(resp.message.as_bytes());
|
||||
bytes.put_u8(0); // null terminator
|
||||
}
|
||||
Self::DbSize(resp) => {
|
||||
bytes.put_u8(Tag::DbSize as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u32(resp.req.dbnode);
|
||||
bytes.put_i64(resp.db_size);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(resp) => {
|
||||
bytes.put_u8(Tag::GetSlruSegment as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u8(resp.req.kind);
|
||||
bytes.put_u32(resp.req.segno);
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(resp) => {
|
||||
bytes.put_u8(Tag::Test as u8);
|
||||
bytes.put_u64(resp.req.hdr.reqid);
|
||||
bytes.put_u64(resp.req.hdr.request_lsn.0);
|
||||
bytes.put_u64(resp.req.hdr.not_modified_since.0);
|
||||
bytes.put_u64(resp.req.batch_key);
|
||||
let message = resp.req.message.as_bytes();
|
||||
bytes.put_u64(message.len() as u64);
|
||||
bytes.put_slice(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
|
||||
let mut buf = buf.reader();
|
||||
let msg_tag = buf.read_u8()?;
|
||||
|
||||
use PagestreamBeMessageTag as Tag;
|
||||
let ok =
|
||||
match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? {
|
||||
Tag::Exists => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let exists = buf.read_u8()? != 0;
|
||||
Self::Exists(PagestreamExistsResponse {
|
||||
req: PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel,
|
||||
},
|
||||
exists,
|
||||
})
|
||||
}
|
||||
Tag::Nblocks => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let n_blocks = buf.read_u32::<BigEndian>()?;
|
||||
Self::Nblocks(PagestreamNblocksResponse {
|
||||
req: PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel,
|
||||
},
|
||||
n_blocks,
|
||||
})
|
||||
}
|
||||
Tag::GetPage => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let blkno = buf.read_u32::<BigEndian>()?;
|
||||
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
|
||||
buf.read_exact(&mut page)?;
|
||||
Self::GetPage(PagestreamGetPageResponse {
|
||||
req: PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
rel,
|
||||
blkno,
|
||||
},
|
||||
page: page.into(),
|
||||
})
|
||||
}
|
||||
Tag::Error => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let mut msg = Vec::new();
|
||||
buf.read_until(0, &mut msg)?;
|
||||
let cstring = std::ffi::CString::from_vec_with_nul(msg)?;
|
||||
let rust_str = cstring.to_str()?;
|
||||
Self::Error(PagestreamErrorResponse {
|
||||
req: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
message: rust_str.to_owned(),
|
||||
})
|
||||
}
|
||||
Tag::DbSize => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let dbnode = buf.read_u32::<BigEndian>()?;
|
||||
let db_size = buf.read_i64::<BigEndian>()?;
|
||||
Self::DbSize(PagestreamDbSizeResponse {
|
||||
req: PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
dbnode,
|
||||
},
|
||||
db_size,
|
||||
})
|
||||
}
|
||||
Tag::GetSlruSegment => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let kind = buf.read_u8()?;
|
||||
let segno = buf.read_u32::<BigEndian>()?;
|
||||
let n_blocks = buf.read_u32::<BigEndian>()?;
|
||||
let mut segment = vec![0; n_blocks as usize * BLCKSZ];
|
||||
buf.read_exact(&mut segment)?;
|
||||
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
|
||||
req: PagestreamGetSlruSegmentRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
kind,
|
||||
segno,
|
||||
},
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
Tag::Test => {
|
||||
let reqid = buf.read_u64::<BigEndian>()?;
|
||||
let request_lsn = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let not_modified_since = Lsn(buf.read_u64::<BigEndian>()?);
|
||||
let batch_key = buf.read_u64::<BigEndian>()?;
|
||||
let len = buf.read_u64::<BigEndian>()?;
|
||||
let mut msg = vec![0; len as usize];
|
||||
buf.read_exact(&mut msg)?;
|
||||
let message = String::from_utf8(msg)?;
|
||||
Self::Test(PagestreamTestResponse {
|
||||
req: PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid,
|
||||
request_lsn,
|
||||
not_modified_since,
|
||||
},
|
||||
batch_key,
|
||||
message,
|
||||
},
|
||||
})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
anyhow::bail!(
|
||||
"remaining bytes in msg with tag={msg_tag}: {}",
|
||||
remaining.len()
|
||||
);
|
||||
}
|
||||
Ok(ok)
|
||||
}
|
||||
|
||||
pub fn kind(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Exists(_) => "Exists",
|
||||
Self::Nblocks(_) => "Nblocks",
|
||||
Self::GetPage(_) => "GetPage",
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
#[cfg(feature = "testing")]
|
||||
Self::Test(_) => "Test",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_pagestream() {
|
||||
// Test serialization/deserialization of PagestreamFeMessage
|
||||
let messages = vec![
|
||||
PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
},
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
dbnode: 3,
|
||||
relnode: 4,
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(4),
|
||||
},
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
dbnode: 3,
|
||||
relnode: 4,
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
},
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
spcnode: 2,
|
||||
dbnode: 3,
|
||||
relnode: 4,
|
||||
},
|
||||
blkno: 7,
|
||||
}),
|
||||
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(4),
|
||||
not_modified_since: Lsn(3),
|
||||
},
|
||||
dbnode: 7,
|
||||
}),
|
||||
];
|
||||
for msg in messages {
|
||||
let bytes = msg.serialize();
|
||||
let reconstructed =
|
||||
PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3)
|
||||
.unwrap();
|
||||
assert!(msg == reconstructed);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt;
|
||||
|
||||
use postgres_ffi::Oid;
|
||||
use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
|
||||
use postgres_ffi::relfile_utils::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
|
||||
use postgres_ffi_types::Oid;
|
||||
use postgres_ffi_types::constants::GLOBALTABLESPACE_OID;
|
||||
use postgres_ffi_types::forknum::{MAIN_FORKNUM, forkname_to_number, forknumber_to_name};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
///
|
||||
|
||||
@@ -35,7 +35,7 @@ use std::hash::{Hash, Hasher};
|
||||
|
||||
#[doc(inline)]
|
||||
pub use ::utils::shard::*;
|
||||
use postgres_ffi::relfile_utils::INIT_FORKNUM;
|
||||
use postgres_ffi_types::forknum::INIT_FORKNUM;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::key::Key;
|
||||
|
||||
@@ -23,22 +23,12 @@ pub struct ReAttachRequest {
|
||||
pub register: Option<NodeRegisterRequest>,
|
||||
}
|
||||
|
||||
fn default_mode() -> LocationConfigMode {
|
||||
LocationConfigMode::AttachedSingle
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct ReAttachResponseTenant {
|
||||
pub id: TenantShardId,
|
||||
/// Mandatory if LocationConfigMode is None or set to an Attached* mode
|
||||
pub r#gen: Option<u32>,
|
||||
|
||||
/// Default value only for backward compat: this field should be set
|
||||
#[serde(default = "default_mode")]
|
||||
pub mode: LocationConfigMode,
|
||||
|
||||
// Default value only for backward compat: this field should be set
|
||||
#[serde(default = "ShardStripeSize::default")]
|
||||
pub stripe_size: ShardStripeSize,
|
||||
}
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -16,6 +16,7 @@ memoffset.workspace = true
|
||||
pprof.workspace = true
|
||||
thiserror.workspace = true
|
||||
serde.workspace = true
|
||||
postgres_ffi_types.workspace = true
|
||||
utils.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
|
||||
@@ -11,11 +11,7 @@
|
||||
|
||||
use crate::{BLCKSZ, PageHeaderData};
|
||||
|
||||
//
|
||||
// From pg_tablespace_d.h
|
||||
//
|
||||
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||
pub const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||
// Note: There are a few more widely-used constants in the postgres_ffi_types::constants crate.
|
||||
|
||||
// From storage_xlog.h
|
||||
pub const XLOG_SMGR_CREATE: u8 = 0x10;
|
||||
|
||||
@@ -4,50 +4,7 @@
|
||||
use once_cell::sync::OnceCell;
|
||||
use regex::Regex;
|
||||
|
||||
//
|
||||
// Fork numbers, from relpath.h
|
||||
//
|
||||
pub const MAIN_FORKNUM: u8 = 0;
|
||||
pub const FSM_FORKNUM: u8 = 1;
|
||||
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
|
||||
pub const INIT_FORKNUM: u8 = 3;
|
||||
|
||||
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum FilePathError {
|
||||
#[error("invalid relation fork name")]
|
||||
InvalidForkName,
|
||||
#[error("invalid relation data file name")]
|
||||
InvalidFileName,
|
||||
}
|
||||
|
||||
impl From<core::num::ParseIntError> for FilePathError {
|
||||
fn from(_e: core::num::ParseIntError) -> Self {
|
||||
FilePathError::InvalidFileName
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert Postgres relation file's fork suffix to fork number.
|
||||
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
|
||||
match forkname {
|
||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
||||
None => Ok(MAIN_FORKNUM),
|
||||
Some("fsm") => Ok(FSM_FORKNUM),
|
||||
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
|
||||
Some("init") => Ok(INIT_FORKNUM),
|
||||
Some(_) => Err(FilePathError::InvalidForkName),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert Postgres fork number to the right suffix of the relation data file.
|
||||
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
|
||||
match forknum {
|
||||
MAIN_FORKNUM => None,
|
||||
FSM_FORKNUM => Some("fsm"),
|
||||
VISIBILITYMAP_FORKNUM => Some("vm"),
|
||||
INIT_FORKNUM => Some("init"),
|
||||
_ => Some("UNKNOWN FORKNUM"),
|
||||
}
|
||||
}
|
||||
use postgres_ffi_types::forknum::*;
|
||||
|
||||
/// Parse a filename of a relation file. Returns (relfilenode, forknum, segno) tuple.
|
||||
///
|
||||
@@ -75,7 +32,9 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
|
||||
.ok_or(FilePathError::InvalidFileName)?;
|
||||
|
||||
let relnode_str = caps.name("relnode").unwrap().as_str();
|
||||
let relnode = relnode_str.parse::<u32>()?;
|
||||
let relnode = relnode_str
|
||||
.parse::<u32>()
|
||||
.map_err(|_e| FilePathError::InvalidFileName)?;
|
||||
|
||||
let forkname = caps.name("forkname").map(|f| f.as_str());
|
||||
let forknum = forkname_to_number(forkname)?;
|
||||
@@ -84,7 +43,11 @@ pub fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
|
||||
let segno = if segno_match.is_none() {
|
||||
0
|
||||
} else {
|
||||
segno_match.unwrap().as_str().parse::<u32>()?
|
||||
segno_match
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.parse::<u32>()
|
||||
.map_err(|_e| FilePathError::InvalidFileName)?
|
||||
};
|
||||
|
||||
Ok((relnode, forknum, segno))
|
||||
|
||||
11
libs/postgres_ffi_types/Cargo.toml
Normal file
11
libs/postgres_ffi_types/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "postgres_ffi_types"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
thiserror.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[dev-dependencies]
|
||||
8
libs/postgres_ffi_types/src/constants.rs
Normal file
8
libs/postgres_ffi_types/src/constants.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
//! Misc constants, copied from PostgreSQL headers.
|
||||
//!
|
||||
//! Any constants included here must be the same in all PostgreSQL versions and unlikely to change
|
||||
//! in the future either!
|
||||
|
||||
// From pg_tablespace_d.h
|
||||
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||
pub const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||
36
libs/postgres_ffi_types/src/forknum.rs
Normal file
36
libs/postgres_ffi_types/src/forknum.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
// Fork numbers, from relpath.h
|
||||
pub const MAIN_FORKNUM: u8 = 0;
|
||||
pub const FSM_FORKNUM: u8 = 1;
|
||||
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
|
||||
pub const INIT_FORKNUM: u8 = 3;
|
||||
|
||||
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum FilePathError {
|
||||
#[error("invalid relation fork name")]
|
||||
InvalidForkName,
|
||||
#[error("invalid relation data file name")]
|
||||
InvalidFileName,
|
||||
}
|
||||
|
||||
/// Convert Postgres relation file's fork suffix to fork number.
|
||||
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
|
||||
match forkname {
|
||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
||||
None => Ok(MAIN_FORKNUM),
|
||||
Some("fsm") => Ok(FSM_FORKNUM),
|
||||
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
|
||||
Some("init") => Ok(INIT_FORKNUM),
|
||||
Some(_) => Err(FilePathError::InvalidForkName),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert Postgres fork number to the right suffix of the relation data file.
|
||||
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
|
||||
match forknum {
|
||||
MAIN_FORKNUM => None,
|
||||
FSM_FORKNUM => Some("fsm"),
|
||||
VISIBILITYMAP_FORKNUM => Some("vm"),
|
||||
INIT_FORKNUM => Some("init"),
|
||||
_ => Some("UNKNOWN FORKNUM"),
|
||||
}
|
||||
}
|
||||
13
libs/postgres_ffi_types/src/lib.rs
Normal file
13
libs/postgres_ffi_types/src/lib.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
//! This package contains some PostgreSQL constants and datatypes that are the same in all versions
|
||||
//! of PostgreSQL and unlikely to change in the future either. These could be derived from the
|
||||
//! PostgreSQL headers with 'bindgen', but in order to avoid proliferating the dependency to bindgen
|
||||
//! and the PostgreSQL C headers to all services, we prefer to have this small stand-alone crate for
|
||||
//! them instead.
|
||||
//!
|
||||
//! Be mindful in what you add here, as these types are deeply ingrained in the APIs.
|
||||
|
||||
pub mod constants;
|
||||
pub mod forknum;
|
||||
|
||||
pub type Oid = u32;
|
||||
pub type RepOriginId = u16;
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::io;
|
||||
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use crate::client::SocketConfig;
|
||||
@@ -8,7 +6,7 @@ use crate::tls::MakeTlsConnect;
|
||||
use crate::{Error, cancel_query_raw, connect_socket};
|
||||
|
||||
pub(crate) async fn cancel_query<T>(
|
||||
config: Option<SocketConfig>,
|
||||
config: SocketConfig,
|
||||
ssl_mode: SslMode,
|
||||
tls: T,
|
||||
process_id: i32,
|
||||
@@ -17,16 +15,6 @@ pub(crate) async fn cancel_query<T>(
|
||||
where
|
||||
T: MakeTlsConnect<TcpStream>,
|
||||
{
|
||||
let config = match config {
|
||||
Some(config) => config,
|
||||
None => {
|
||||
return Err(Error::connect(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"unknown host",
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let hostname = match &config.host {
|
||||
Host::Tcp(host) => &**host,
|
||||
};
|
||||
|
||||
@@ -7,11 +7,16 @@ use crate::config::SslMode;
|
||||
use crate::tls::{MakeTlsConnect, TlsConnect};
|
||||
use crate::{Error, cancel_query, cancel_query_raw};
|
||||
|
||||
/// The capability to request cancellation of in-progress queries on a
|
||||
/// connection.
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
/// A cancellation token that allows easy cancellation of a query.
|
||||
#[derive(Clone)]
|
||||
pub struct CancelToken {
|
||||
pub socket_config: Option<SocketConfig>,
|
||||
pub socket_config: SocketConfig,
|
||||
pub raw: RawCancelToken,
|
||||
}
|
||||
|
||||
/// A raw cancellation token that allows cancellation of a query, given a fresh connection to postgres.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RawCancelToken {
|
||||
pub ssl_mode: SslMode,
|
||||
pub process_id: i32,
|
||||
pub secret_key: i32,
|
||||
@@ -36,14 +41,16 @@ impl CancelToken {
|
||||
{
|
||||
cancel_query::cancel_query(
|
||||
self.socket_config.clone(),
|
||||
self.ssl_mode,
|
||||
self.raw.ssl_mode,
|
||||
tls,
|
||||
self.process_id,
|
||||
self.secret_key,
|
||||
self.raw.process_id,
|
||||
self.raw.secret_key,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl RawCancelToken {
|
||||
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
|
||||
/// connection itself.
|
||||
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
|
||||
|
||||
@@ -12,6 +12,7 @@ use postgres_protocol2::message::frontend;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::cancel_token::RawCancelToken;
|
||||
use crate::codec::{BackendMessages, FrontendMessage};
|
||||
use crate::config::{Host, SslMode};
|
||||
use crate::query::RowStream;
|
||||
@@ -331,10 +332,12 @@ impl Client {
|
||||
/// connection associated with this client.
|
||||
pub fn cancel_token(&self) -> CancelToken {
|
||||
CancelToken {
|
||||
socket_config: Some(self.socket_config.clone()),
|
||||
ssl_mode: self.ssl_mode,
|
||||
process_id: self.process_id,
|
||||
secret_key: self.secret_key,
|
||||
socket_config: self.socket_config.clone(),
|
||||
raw: RawCancelToken {
|
||||
ssl_mode: self.ssl_mode,
|
||||
process_id: self.process_id,
|
||||
secret_key: self.secret_key,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use postgres_protocol2::message::backend::ReadyForQueryBody;
|
||||
|
||||
pub use crate::cancel_token::CancelToken;
|
||||
pub use crate::cancel_token::{CancelToken, RawCancelToken};
|
||||
pub use crate::client::{Client, SocketConfig};
|
||||
pub use crate::config::Config;
|
||||
pub use crate::connect_raw::RawConnection;
|
||||
|
||||
@@ -87,6 +87,28 @@ pub enum RemoteStorageKind {
|
||||
AzureContainer(AzureConfig),
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
/// Version of RemoteStorageKind which deserializes with type: LocalFs | AwsS3 | AzureContainer
|
||||
/// Needed for endpoint storage service
|
||||
pub enum TypedRemoteStorageKind {
|
||||
LocalFs { local_path: Utf8PathBuf },
|
||||
AwsS3(S3Config),
|
||||
AzureContainer(AzureConfig),
|
||||
}
|
||||
|
||||
impl From<TypedRemoteStorageKind> for RemoteStorageKind {
|
||||
fn from(value: TypedRemoteStorageKind) -> Self {
|
||||
match value {
|
||||
TypedRemoteStorageKind::LocalFs { local_path } => {
|
||||
RemoteStorageKind::LocalFs { local_path }
|
||||
}
|
||||
TypedRemoteStorageKind::AwsS3(v) => RemoteStorageKind::AwsS3(v),
|
||||
TypedRemoteStorageKind::AzureContainer(v) => RemoteStorageKind::AzureContainer(v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
|
||||
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||
pub struct S3Config {
|
||||
|
||||
@@ -31,6 +31,7 @@ use anyhow::Context;
|
||||
pub use azure_core::Etag;
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
pub use config::TypedRemoteStorageKind;
|
||||
pub use error::{DownloadError, TimeTravelError, TimeoutOrCancel};
|
||||
use futures::StreamExt;
|
||||
use futures::stream::Stream;
|
||||
@@ -676,6 +677,15 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
pub async fn from_storage_kind(kind: TypedRemoteStorageKind) -> anyhow::Result<Self> {
|
||||
Self::from_config(&RemoteStorageConfig {
|
||||
storage: kind.into(),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
|
||||
let timeout = storage_config.timeout;
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ bytes.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
prost.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_ffi_types.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util"] }
|
||||
|
||||
@@ -8,8 +8,8 @@ use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::models::*;
|
||||
|
||||
@@ -25,6 +25,9 @@
|
||||
//! |
|
||||
//! |--> write to KV store within the pageserver
|
||||
|
||||
pub mod record;
|
||||
pub mod value;
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::walrecord::{
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::record::NeonWalRecord;
|
||||
use crate::models::record::NeonWalRecord;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum Value {
|
||||
@@ -1,4 +1,4 @@
|
||||
//! This module implements batch type for serialized [`pageserver_api::value::Value`]
|
||||
//! This module implements batch type for serialized [`crate::models::value::Value`]
|
||||
//! instances. Each batch contains a raw buffer (serialized values)
|
||||
//! and a list of metadata for each (key, LSN) tuple present in the batch.
|
||||
//!
|
||||
@@ -10,10 +10,8 @@ use std::collections::{BTreeSet, HashMap};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
|
||||
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -21,6 +19,8 @@ use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::models::InterpretedWalRecord;
|
||||
use crate::models::record::NeonWalRecord;
|
||||
use crate::models::value::Value;
|
||||
|
||||
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
|
||||
|
||||
@@ -13,22 +13,24 @@ fn main() -> anyhow::Result<()> {
|
||||
// Tell cargo to invalidate the built crate whenever the wrapper changes
|
||||
println!("cargo:rerun-if-changed=bindgen_deps.h");
|
||||
|
||||
let root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../..");
|
||||
|
||||
// Finding the location of built libraries and Postgres C headers:
|
||||
// - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `<project_root>/pg_install`
|
||||
// - if there's a `bin/pg_config` file use it for getting include server, otherwise use `<project_root>/pg_install/{PG_MAJORVERSION}/include/postgresql/server`
|
||||
let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") {
|
||||
postgres_install_dir.into()
|
||||
} else {
|
||||
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pg_install")
|
||||
root_path.join("pg_install")
|
||||
};
|
||||
|
||||
let pg_install_abs = std::fs::canonicalize(pg_install_dir)?;
|
||||
let walproposer_lib_dir = pg_install_abs.join("build/walproposer-lib");
|
||||
let walproposer_lib_dir = root_path.join("build/walproposer-lib");
|
||||
let walproposer_lib_search_str = walproposer_lib_dir
|
||||
.to_str()
|
||||
.ok_or(anyhow!("Bad non-UTF path"))?;
|
||||
|
||||
let pgxn_neon = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pgxn/neon");
|
||||
let pgxn_neon = root_path.join("pgxn/neon");
|
||||
let pgxn_neon = std::fs::canonicalize(pgxn_neon)?;
|
||||
let pgxn_neon = pgxn_neon.to_str().ok_or(anyhow!("Bad non-UTF path"))?;
|
||||
|
||||
|
||||
@@ -311,7 +311,7 @@ extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
|
||||
}
|
||||
}
|
||||
|
||||
extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
|
||||
unsafe extern "C-unwind" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) -> ! {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
|
||||
@@ -144,7 +144,7 @@ pub trait ApiImpl {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn finish_sync_safekeepers(&self, _lsn: u64) {
|
||||
fn finish_sync_safekeepers(&self, _lsn: u64) -> ! {
|
||||
todo!()
|
||||
}
|
||||
|
||||
@@ -469,7 +469,7 @@ mod tests {
|
||||
true
|
||||
}
|
||||
|
||||
fn finish_sync_safekeepers(&self, lsn: u64) {
|
||||
fn finish_sync_safekeepers(&self, lsn: u64) -> ! {
|
||||
self.sync_channel.send(lsn).unwrap();
|
||||
panic!("sync safekeepers finished at lsn={}", lsn);
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ pin-project-lite.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
postgres_ffi_types.workspace = true
|
||||
postgres_initdb.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
postgres-types.workspace = true
|
||||
|
||||
@@ -13,11 +13,11 @@ use pageserver::{page_cache, virtual_file};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use wal_decoder::models::value::Value;
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
|
||||
// A very cheap hash for generating non-sequential keys.
|
||||
|
||||
@@ -67,12 +67,12 @@ use once_cell::sync::Lazy;
|
||||
use pageserver::config::PageServerConf;
|
||||
use pageserver::walredo::{PostgresRedoManager, RedoAttemptType};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::id::TenantId;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
fn bench(c: &mut Criterion) {
|
||||
macro_rules! bench_group {
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use pageserver_api::models::{
|
||||
use pageserver_api::pagestream_api::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
};
|
||||
use pageserver_api::reltag::RelTag;
|
||||
|
||||
21
pageserver/communicator_pools/client_cache/Cargo.toml
Normal file
21
pageserver/communicator_pools/client_cache/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "client_cache"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
hyper-util.workspace = true
|
||||
http.workspace = true
|
||||
priority-queue = "2.3.1"
|
||||
rand.workspace = true
|
||||
tonic.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tower.workspace = true
|
||||
uuid.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
105
pageserver/communicator_pools/client_cache/src/lib.rs
Normal file
105
pageserver/communicator_pools/client_cache/src/lib.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
use async_trait::async_trait;
|
||||
use priority_queue::PriorityQueue;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
|
||||
|
||||
#[async_trait]
|
||||
pub trait PooledClientFactory<T>: Send + Sync + 'static {
|
||||
/// Create a new pooled item.
|
||||
async fn create(
|
||||
&self,
|
||||
connect_timeout: Duration,
|
||||
) -> Result<Result<T, tonic::Status>, tokio::time::error::Elapsed>;
|
||||
}
|
||||
|
||||
/// A pooled gRPC client with capacity tracking and error handling.
|
||||
#[allow(dead_code)]
|
||||
pub struct ClientCache<T> {
|
||||
inner: Mutex<Inner<T>>,
|
||||
|
||||
fact: Arc<dyn PooledClientFactory<T> + Send + Sync>,
|
||||
|
||||
connect_timeout: Duration,
|
||||
connect_backoff: Duration,
|
||||
|
||||
/// The maximum number of consumers that can use a single connection.
|
||||
max_consumers: usize,
|
||||
|
||||
/// The number of consecutive errors before a connection is removed from the pool.
|
||||
error_threshold: usize,
|
||||
|
||||
/// The maximum duration a connection can be idle before being removed.
|
||||
max_idle_duration: Duration,
|
||||
max_total_connections: usize,
|
||||
|
||||
client_semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct Inner<T> {
|
||||
entries: HashMap<uuid::Uuid, CacheEntry<T>>,
|
||||
pq: PriorityQueue<uuid::Uuid, usize>,
|
||||
// This is updated when a connection is dropped, or we fail
|
||||
// to create a new connection.
|
||||
last_connect_failure: Option<Instant>,
|
||||
waiters: usize,
|
||||
in_progress: usize,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct CacheEntry<T> {
|
||||
client: T,
|
||||
active_consumers: usize,
|
||||
consecutive_errors: usize,
|
||||
last_used: Instant,
|
||||
}
|
||||
|
||||
/// A client borrowed from the pool.
|
||||
#[allow(dead_code)]
|
||||
pub struct PooledClient<T> {
|
||||
pub client: T,
|
||||
pool: Arc<ClientCache<T>>,
|
||||
is_ok: bool,
|
||||
id: uuid::Uuid,
|
||||
permit: OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
impl<T: Clone + Send + 'static> ClientCache<T> {
|
||||
pub fn new(
|
||||
fact: Arc<dyn PooledClientFactory<T> + Send + Sync>,
|
||||
connect_timeout: Duration,
|
||||
connect_backoff: Duration,
|
||||
max_consumers: usize,
|
||||
error_threshold: usize,
|
||||
max_idle_duration: Duration,
|
||||
max_total_connections: usize,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
inner: Mutex::new(Inner::<T> {
|
||||
entries: HashMap::new(),
|
||||
pq: PriorityQueue::new(),
|
||||
last_connect_failure: None,
|
||||
waiters: 0,
|
||||
in_progress: 0,
|
||||
}),
|
||||
fact: Arc::clone(&fact),
|
||||
connect_timeout,
|
||||
connect_backoff,
|
||||
max_consumers,
|
||||
error_threshold,
|
||||
max_idle_duration,
|
||||
max_total_connections,
|
||||
client_semaphore: Arc::new(Semaphore::new(0)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone + Send + 'static> PooledClient<T> {
|
||||
pub fn client(&self) -> T {
|
||||
self.client.clone()
|
||||
}
|
||||
}
|
||||
8
pageserver/communicator_pools/request_tracker/Cargo.toml
Normal file
8
pageserver/communicator_pools/request_tracker/Cargo.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "request_tracker"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
workspace_hack.workspace = true
|
||||
15
pageserver/communicator_pools/request_tracker/src/lib.rs
Normal file
15
pageserver/communicator_pools/request_tracker/src/lib.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
// Temporary placeholder until the request tracker is implemented
|
||||
pub fn add(left: u64, right: u64) -> u64 {
|
||||
left + right
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let result = add(2, 2);
|
||||
assert_eq!(result, 4);
|
||||
}
|
||||
}
|
||||
@@ -5,11 +5,16 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
prost.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tonic.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -108,6 +108,8 @@ message GetBaseBackupRequest {
|
||||
uint64 lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
// If true, include relation files in the base backup. Mainly for debugging and tests.
|
||||
bool full = 3;
|
||||
}
|
||||
|
||||
// Base backup response chunk, returned as an ordered stream.
|
||||
|
||||
200
pageserver/page_api/src/client.rs
Normal file
200
pageserver/page_api/src/client.rs
Normal file
@@ -0,0 +1,200 @@
|
||||
use std::convert::TryInto;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::TryStreamExt;
|
||||
use futures::{Stream, StreamExt};
|
||||
use tonic::metadata::AsciiMetadataValue;
|
||||
use tonic::metadata::errors::InvalidMetadataValue;
|
||||
use tonic::transport::Channel;
|
||||
use tonic::{Request, Streaming};
|
||||
|
||||
use utils::id::TenantId;
|
||||
use utils::id::TimelineId;
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use crate::model;
|
||||
use crate::proto;
|
||||
|
||||
///
|
||||
/// AuthInterceptor adds tenant, timeline, and auth header to the channel. These
|
||||
/// headers are required at the pageserver.
|
||||
///
|
||||
#[derive(Clone)]
|
||||
struct AuthInterceptor {
|
||||
tenant_id: AsciiMetadataValue,
|
||||
timeline_id: AsciiMetadataValue,
|
||||
shard_id: AsciiMetadataValue,
|
||||
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
|
||||
}
|
||||
|
||||
impl AuthInterceptor {
|
||||
fn new(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
auth_token: Option<String>,
|
||||
shard_id: ShardIndex,
|
||||
) -> Result<Self, InvalidMetadataValue> {
|
||||
let tenant_ascii: AsciiMetadataValue = tenant_id.to_string().try_into()?;
|
||||
let timeline_ascii: AsciiMetadataValue = timeline_id.to_string().try_into()?;
|
||||
let shard_ascii: AsciiMetadataValue = shard_id.to_string().try_into()?;
|
||||
|
||||
let auth_header: Option<AsciiMetadataValue> = match auth_token {
|
||||
Some(token) => Some(format!("Bearer {token}").try_into()?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
tenant_id: tenant_ascii,
|
||||
shard_id: shard_ascii,
|
||||
timeline_id: timeline_ascii,
|
||||
auth_header,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl tonic::service::Interceptor for AuthInterceptor {
|
||||
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
req.metadata_mut()
|
||||
.insert("neon-tenant-id", self.tenant_id.clone());
|
||||
req.metadata_mut()
|
||||
.insert("neon-shard-id", self.shard_id.clone());
|
||||
req.metadata_mut()
|
||||
.insert("neon-timeline-id", self.timeline_id.clone());
|
||||
if let Some(auth_header) = &self.auth_header {
|
||||
req.metadata_mut()
|
||||
.insert("authorization", auth_header.clone());
|
||||
}
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
#[derive(Clone)]
|
||||
pub struct Client {
|
||||
client: proto::PageServiceClient<
|
||||
tonic::service::interceptor::InterceptedService<Channel, AuthInterceptor>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new<T: TryInto<tonic::transport::Endpoint> + Send + Sync + 'static>(
|
||||
into_endpoint: T,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
shard_id: ShardIndex,
|
||||
auth_header: Option<String>,
|
||||
compression: Option<tonic::codec::CompressionEncoding>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let endpoint: tonic::transport::Endpoint = into_endpoint
|
||||
.try_into()
|
||||
.map_err(|_e| anyhow::anyhow!("failed to convert endpoint"))?;
|
||||
let channel = endpoint.connect().await?;
|
||||
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
|
||||
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
|
||||
let mut client = proto::PageServiceClient::with_interceptor(channel, auth);
|
||||
|
||||
if let Some(compression) = compression {
|
||||
// TODO: benchmark this (including network latency).
|
||||
// TODO: consider enabling compression by default.
|
||||
client = client
|
||||
.accept_compressed(compression)
|
||||
.send_compressed(compression);
|
||||
}
|
||||
|
||||
Ok(Self { client })
|
||||
}
|
||||
|
||||
/// Returns whether a relation exists.
|
||||
pub async fn check_rel_exists(
|
||||
&mut self,
|
||||
req: model::CheckRelExistsRequest,
|
||||
) -> Result<model::CheckRelExistsResponse, tonic::Status> {
|
||||
let proto_req = proto::CheckRelExistsRequest::from(req);
|
||||
|
||||
let response = self.client.check_rel_exists(proto_req).await?;
|
||||
|
||||
let proto_resp = response.into_inner();
|
||||
Ok(proto_resp.into())
|
||||
}
|
||||
|
||||
/// Fetches a base backup.
|
||||
pub async fn get_base_backup(
|
||||
&mut self,
|
||||
req: model::GetBaseBackupRequest,
|
||||
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>> + 'static, tonic::Status> {
|
||||
let proto_req = proto::GetBaseBackupRequest::from(req);
|
||||
|
||||
let response_stream: Streaming<proto::GetBaseBackupResponseChunk> =
|
||||
self.client.get_base_backup(proto_req).await?.into_inner();
|
||||
|
||||
// TODO: Consider dechunking internally
|
||||
let domain_stream = response_stream.map(|chunk_res| {
|
||||
chunk_res.and_then(|proto_chunk| {
|
||||
proto_chunk.try_into().map_err(|e| {
|
||||
tonic::Status::internal(format!("Failed to convert response chunk: {}", e))
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
Ok(domain_stream)
|
||||
}
|
||||
|
||||
/// Returns the total size of a database, as # of bytes.
|
||||
pub async fn get_db_size(
|
||||
&mut self,
|
||||
req: model::GetDbSizeRequest,
|
||||
) -> Result<u64, tonic::Status> {
|
||||
let proto_req = proto::GetDbSizeRequest::from(req);
|
||||
|
||||
let response = self.client.get_db_size(proto_req).await?;
|
||||
Ok(response.into_inner().into())
|
||||
}
|
||||
|
||||
/// Fetches pages.
|
||||
///
|
||||
/// This is implemented as a bidirectional streaming RPC for performance.
|
||||
/// Per-request errors are often returned as status_code instead of errors,
|
||||
/// to avoid tearing down the entire stream via tonic::Status.
|
||||
pub async fn get_pages<ReqSt>(
|
||||
&mut self,
|
||||
inbound: ReqSt,
|
||||
) -> Result<
|
||||
impl Stream<Item = Result<model::GetPageResponse, tonic::Status>> + Send + 'static,
|
||||
tonic::Status,
|
||||
>
|
||||
where
|
||||
ReqSt: Stream<Item = model::GetPageRequest> + Send + 'static,
|
||||
{
|
||||
let outbound_proto = inbound.map(|domain_req| domain_req.into());
|
||||
|
||||
let req_new = Request::new(outbound_proto);
|
||||
|
||||
let response_stream: Streaming<proto::GetPageResponse> =
|
||||
self.client.get_pages(req_new).await?.into_inner();
|
||||
|
||||
let domain_stream = response_stream.map_ok(model::GetPageResponse::from);
|
||||
|
||||
Ok(domain_stream)
|
||||
}
|
||||
|
||||
/// Returns the size of a relation, as # of blocks.
|
||||
pub async fn get_rel_size(
|
||||
&mut self,
|
||||
req: model::GetRelSizeRequest,
|
||||
) -> Result<model::GetRelSizeResponse, tonic::Status> {
|
||||
let proto_req = proto::GetRelSizeRequest::from(req);
|
||||
let response = self.client.get_rel_size(proto_req).await?;
|
||||
let proto_resp = response.into_inner();
|
||||
Ok(proto_resp.into())
|
||||
}
|
||||
|
||||
/// Fetches an SLRU segment.
|
||||
pub async fn get_slru_segment(
|
||||
&mut self,
|
||||
req: model::GetSlruSegmentRequest,
|
||||
) -> Result<model::GetSlruSegmentResponse, tonic::Status> {
|
||||
let proto_req = proto::GetSlruSegmentRequest::from(req);
|
||||
let response = self.client.get_slru_segment(proto_req).await?;
|
||||
Ok(response.into_inner().try_into()?)
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,8 @@ pub mod proto {
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
|
||||
mod client;
|
||||
pub use client::Client;
|
||||
mod model;
|
||||
|
||||
pub use model::*;
|
||||
|
||||
@@ -189,6 +189,8 @@ pub struct GetBaseBackupRequest {
|
||||
pub lsn: Option<Lsn>,
|
||||
/// If true, logical replication slots will not be created.
|
||||
pub replica: bool,
|
||||
/// If true, include relation files in the base backup. Mainly for debugging and tests.
|
||||
pub full: bool,
|
||||
}
|
||||
|
||||
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
@@ -196,6 +198,7 @@ impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
|
||||
Self {
|
||||
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
|
||||
replica: pb.replica,
|
||||
full: pb.full,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -205,6 +208,7 @@ impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
|
||||
Self {
|
||||
lsn: request.lsn.unwrap_or_default().0,
|
||||
replica: request.replica,
|
||||
full: request.full,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -455,7 +459,7 @@ impl GetPageResponse {
|
||||
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
|
||||
/// (potentially shared by many backends), and a gRPC status response would terminate the stream so
|
||||
/// we send GetPageResponse messages with these codes instead.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)]
|
||||
pub enum GetPageStatusCode {
|
||||
/// Unknown status. For forwards compatibility: used when an older client version receives a new
|
||||
/// status code from a newer server version.
|
||||
|
||||
@@ -25,6 +25,7 @@ tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -1,20 +1,29 @@
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::anyhow;
|
||||
use futures::TryStreamExt as _;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
|
||||
use pageserver_client::page_service::BasebackupRequest;
|
||||
use pageserver_page_api as page_api;
|
||||
use rand::prelude::*;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tonic::async_trait;
|
||||
use tracing::{info, instrument};
|
||||
use url::Url;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
@@ -24,14 +33,15 @@ use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
/// The Pageserver to connect to. Use postgresql:// for libpq, or grpc:// for gRPC.
|
||||
#[clap(long, default_value = "postgresql://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long, default_value = "1.0")]
|
||||
gzip_probability: f64,
|
||||
#[clap(long)]
|
||||
no_compression: bool,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long)]
|
||||
@@ -146,12 +156,27 @@ async fn main_impl(
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let scheme = match Url::parse(&args.page_service_connstring) {
|
||||
Ok(url) => url.scheme().to_lowercase().to_string(),
|
||||
Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
|
||||
Err(err) => return Err(anyhow!("invalid connstring: {err}")),
|
||||
};
|
||||
for &tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
|
||||
let client: Box<dyn Client> = match scheme.as_str() {
|
||||
"postgresql" | "postgres" => Box::new(
|
||||
LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
|
||||
),
|
||||
"grpc" => Box::new(
|
||||
GrpcClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
|
||||
),
|
||||
scheme => return Err(anyhow!("invalid scheme {scheme}")),
|
||||
};
|
||||
|
||||
tasks.push(tokio::spawn(run_worker(
|
||||
client,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
@@ -166,13 +191,7 @@ async fn main_impl(
|
||||
let mut rng = rand::thread_rng();
|
||||
let target = all_targets.choose(&mut rng).unwrap();
|
||||
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
|
||||
(
|
||||
target.timeline,
|
||||
Work {
|
||||
lsn,
|
||||
gzip: rng.gen_bool(args.gzip_probability),
|
||||
},
|
||||
)
|
||||
(target.timeline, Work { lsn })
|
||||
};
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
@@ -216,13 +235,11 @@ async fn main_impl(
|
||||
#[derive(Copy, Clone)]
|
||||
struct Work {
|
||||
lsn: Option<Lsn>,
|
||||
gzip: bool,
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
async fn run_worker(
|
||||
mut client: Box<dyn Client>,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<Work>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
@@ -230,37 +247,14 @@ async fn client(
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(Work { lsn, gzip }) = work.recv().await {
|
||||
while let Some(Work { lsn }) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
let copy_out_stream = client
|
||||
.basebackup(&BasebackupRequest {
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
})
|
||||
.await
|
||||
.with_context(|| format!("start basebackup for {timeline}"))
|
||||
.unwrap();
|
||||
let stream = client.basebackup(lsn).await.unwrap();
|
||||
|
||||
use futures::StreamExt;
|
||||
let size = Arc::new(AtomicUsize::new(0));
|
||||
copy_out_stream
|
||||
.for_each({
|
||||
|r| {
|
||||
let size = Arc::clone(&size);
|
||||
async move {
|
||||
let size = Arc::clone(&size);
|
||||
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
|
||||
let size = futures::io::copy(stream.compat(), &mut tokio::io::sink().compat_write())
|
||||
.await
|
||||
.unwrap();
|
||||
info!("basebackup size is {size} bytes");
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
@@ -270,3 +264,94 @@ async fn client(
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
|
||||
/// A basebackup client. This allows switching out the client protocol implementation.
|
||||
#[async_trait]
|
||||
trait Client: Send {
|
||||
async fn basebackup(
|
||||
&mut self,
|
||||
lsn: Option<Lsn>,
|
||||
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send>>>;
|
||||
}
|
||||
|
||||
/// A libpq-based Pageserver client.
|
||||
struct LibpqClient {
|
||||
inner: pageserver_client::page_service::Client,
|
||||
ttid: TenantTimelineId,
|
||||
compression: bool,
|
||||
}
|
||||
|
||||
impl LibpqClient {
|
||||
async fn new(
|
||||
connstring: &str,
|
||||
ttid: TenantTimelineId,
|
||||
compression: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
inner: pageserver_client::page_service::Client::new(connstring.to_string()).await?,
|
||||
ttid,
|
||||
compression,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for LibpqClient {
|
||||
async fn basebackup(
|
||||
&mut self,
|
||||
lsn: Option<Lsn>,
|
||||
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
|
||||
let req = BasebackupRequest {
|
||||
tenant_id: self.ttid.tenant_id,
|
||||
timeline_id: self.ttid.timeline_id,
|
||||
lsn,
|
||||
gzip: self.compression,
|
||||
};
|
||||
let stream = self.inner.basebackup(&req).await?;
|
||||
Ok(Box::pin(StreamReader::new(
|
||||
stream.map_err(std::io::Error::other),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC Pageserver client.
|
||||
struct GrpcClient {
|
||||
inner: page_api::Client,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
async fn new(
|
||||
connstring: &str,
|
||||
ttid: TenantTimelineId,
|
||||
compression: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
let inner = page_api::Client::new(
|
||||
connstring.to_string(),
|
||||
ttid.tenant_id,
|
||||
ttid.timeline_id,
|
||||
ShardIndex::unsharded(),
|
||||
None,
|
||||
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
|
||||
)
|
||||
.await?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for GrpcClient {
|
||||
async fn basebackup(
|
||||
&mut self,
|
||||
lsn: Option<Lsn>,
|
||||
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
|
||||
let req = page_api::GetBaseBackupRequest {
|
||||
lsn,
|
||||
replica: false,
|
||||
full: false,
|
||||
};
|
||||
let stream = self.inner.get_base_backup(req).await?;
|
||||
Ok(Box::pin(StreamReader::new(
|
||||
stream.map_err(std::io::Error::other),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,33 +10,31 @@ use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::{Stream, StreamExt as _};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use pageserver_page_api as page_api;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use url::Url;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
#[derive(clap::ValueEnum, Clone, Debug)]
|
||||
enum Protocol {
|
||||
Libpq,
|
||||
Grpc,
|
||||
}
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
/// Pageserver connection string. Supports postgresql:// and grpc:// protocols.
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long)]
|
||||
@@ -45,8 +43,9 @@ pub(crate) struct Args {
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long, value_enum, default_value = "libpq")]
|
||||
protocol: Protocol,
|
||||
/// If true, enable compression (only for gRPC).
|
||||
#[clap(long)]
|
||||
compression: bool,
|
||||
/// Each client sends requests at the given rate.
|
||||
///
|
||||
/// If a request takes too long and we should be issuing a new request already,
|
||||
@@ -325,18 +324,32 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
let client: Box<dyn Client> = match args.protocol {
|
||||
Protocol::Libpq => Box::new(
|
||||
LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
let scheme = match Url::parse(&args.page_service_connstring) {
|
||||
Ok(url) => url.scheme().to_lowercase().to_string(),
|
||||
Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(),
|
||||
Err(err) => panic!("invalid connstring: {err}"),
|
||||
};
|
||||
let client: Box<dyn Client> = match scheme.as_str() {
|
||||
"postgresql" | "postgres" => {
|
||||
assert!(!args.compression, "libpq does not support compression");
|
||||
Box::new(
|
||||
LibpqClient::new(&args.page_service_connstring, worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
"grpc" => Box::new(
|
||||
GrpcClient::new(
|
||||
&args.page_service_connstring,
|
||||
worker_id.timeline,
|
||||
args.compression,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
|
||||
Protocol::Grpc => Box::new(
|
||||
GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
scheme => panic!("unsupported scheme {scheme}"),
|
||||
};
|
||||
run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
@@ -543,8 +556,8 @@ struct LibpqClient {
|
||||
}
|
||||
|
||||
impl LibpqClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let inner = pageserver_client::page_service::Client::new(connstring)
|
||||
async fn new(connstring: &str, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let inner = pageserver_client::page_service::Client::new(connstring.to_string())
|
||||
.await?
|
||||
.pagestream(ttid.tenant_id, ttid.timeline_id)
|
||||
.await?;
|
||||
@@ -600,34 +613,36 @@ impl Client for LibpqClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC client using the raw, no-frills gRPC client.
|
||||
/// A gRPC Pageserver client.
|
||||
struct GrpcClient {
|
||||
req_tx: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
|
||||
resp_rx: tonic::Streaming<proto::GetPageResponse>,
|
||||
req_tx: tokio::sync::mpsc::Sender<page_api::GetPageRequest>,
|
||||
resp_rx: Pin<Box<dyn Stream<Item = Result<page_api::GetPageResponse, tonic::Status>> + Send>>,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?;
|
||||
async fn new(
|
||||
connstring: &str,
|
||||
ttid: TenantTimelineId,
|
||||
compression: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
let mut client = page_api::Client::new(
|
||||
connstring.to_string(),
|
||||
ttid.tenant_id,
|
||||
ttid.timeline_id,
|
||||
ShardIndex::unsharded(),
|
||||
None,
|
||||
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the
|
||||
// benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are
|
||||
// buffered by Tonic and the OS too.
|
||||
let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
|
||||
let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
|
||||
let mut req = tonic::Request::new(req_stream);
|
||||
let metadata = req.metadata_mut();
|
||||
metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?);
|
||||
metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?);
|
||||
metadata.insert("neon-shard-id", "0000".try_into()?);
|
||||
let resp_rx = Box::pin(client.get_pages(req_stream).await?);
|
||||
|
||||
let resp = client.get_pages(req).await?;
|
||||
let resp_stream = resp.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
req_tx,
|
||||
resp_rx: resp_stream,
|
||||
})
|
||||
Ok(Self { req_tx, resp_rx })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -641,27 +656,27 @@ impl Client for GrpcClient {
|
||||
rel: RelTag,
|
||||
blks: Vec<u32>,
|
||||
) -> anyhow::Result<()> {
|
||||
let req = proto::GetPageRequest {
|
||||
let req = page_api::GetPageRequest {
|
||||
request_id: req_id,
|
||||
request_class: proto::GetPageClass::Normal as i32,
|
||||
read_lsn: Some(proto::ReadLsn {
|
||||
request_lsn: req_lsn.0,
|
||||
not_modified_since_lsn: mod_lsn.0,
|
||||
}),
|
||||
rel: Some(rel.into()),
|
||||
block_number: blks,
|
||||
request_class: page_api::GetPageClass::Normal,
|
||||
read_lsn: page_api::ReadLsn {
|
||||
request_lsn: req_lsn,
|
||||
not_modified_since_lsn: Some(mod_lsn),
|
||||
},
|
||||
rel,
|
||||
block_numbers: blks,
|
||||
};
|
||||
self.req_tx.send(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
|
||||
let resp = self.resp_rx.message().await?.unwrap();
|
||||
let resp = self.resp_rx.next().await.unwrap().unwrap();
|
||||
anyhow::ensure!(
|
||||
resp.status_code == proto::GetPageStatusCode::Ok as i32,
|
||||
resp.status_code == page_api::GetPageStatusCode::Ok,
|
||||
"unexpected status code: {}",
|
||||
resp.status_code
|
||||
resp.status_code,
|
||||
);
|
||||
Ok((resp.request_id, resp.page_image))
|
||||
Ok((resp.request_id, resp.page_images))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,13 +18,12 @@ use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::{Key, rel_block_to_key};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants::{
|
||||
DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID, PG_HBA, PGDATA_SPECIAL_FILES,
|
||||
};
|
||||
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
|
||||
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};
|
||||
use postgres_ffi::{
|
||||
BLCKSZ, PG_TLI, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName, dispatch_pgversion, pg_constants,
|
||||
};
|
||||
use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
|
||||
use tokio::io;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio_tar::{Builder, EntryType, Header};
|
||||
@@ -372,6 +371,7 @@ where
|
||||
.partition(
|
||||
self.timeline.get_shard_identity(),
|
||||
self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
|
||||
BLCKSZ as u64,
|
||||
);
|
||||
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
@@ -2,7 +2,9 @@ use std::io::{Read, Write, stdin, stdout};
|
||||
use std::time::Duration;
|
||||
|
||||
use clap::Parser;
|
||||
use pageserver_api::models::{PagestreamRequest, PagestreamTestRequest};
|
||||
use pageserver_api::pagestream_api::{
|
||||
PagestreamFeMessage, PagestreamRequest, PagestreamTestRequest,
|
||||
};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -28,17 +30,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
let mut msg = 0;
|
||||
loop {
|
||||
msg += 1;
|
||||
let fut = sender.send(pageserver_api::models::PagestreamFeMessage::Test(
|
||||
PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(23),
|
||||
not_modified_since: Lsn(23),
|
||||
},
|
||||
batch_key: 42,
|
||||
message: format!("message {}", msg),
|
||||
let fut = sender.send(PagestreamFeMessage::Test(PagestreamTestRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: Lsn(23),
|
||||
not_modified_since: Lsn(23),
|
||||
},
|
||||
));
|
||||
batch_key: 42,
|
||||
message: format!("message {}", msg),
|
||||
}));
|
||||
let Ok(res) = tokio::time::timeout(Duration::from_secs(10), fut).await else {
|
||||
eprintln!("pipe seems full");
|
||||
break;
|
||||
|
||||
@@ -520,7 +520,7 @@ async fn import_file(
|
||||
}
|
||||
|
||||
if file_path.starts_with("global") {
|
||||
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
|
||||
let spcnode = postgres_ffi_types::constants::GLOBALTABLESPACE_OID;
|
||||
let dbnode = 0;
|
||||
|
||||
match file_name.as_ref() {
|
||||
@@ -553,7 +553,7 @@ async fn import_file(
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("base") {
|
||||
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
|
||||
let spcnode = postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
|
||||
let dbnode: u32 = file_path
|
||||
.iter()
|
||||
.nth(1)
|
||||
|
||||
@@ -25,12 +25,13 @@ use pageserver_api::config::{
|
||||
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::models::{
|
||||
self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
use pageserver_api::models::{PageTraceEvent, TenantState};
|
||||
use pageserver_api::pagestream_api::{
|
||||
self, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
|
||||
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
PagestreamProtocolVersion, PagestreamRequest, TenantState,
|
||||
PagestreamProtocolVersion, PagestreamRequest,
|
||||
};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -40,7 +41,7 @@ use postgres_backend::{
|
||||
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi_types::constants::DEFAULTTABLESPACE_OID;
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use smallvec::{SmallVec, smallvec};
|
||||
@@ -712,7 +713,7 @@ struct BatchedGetPageRequest {
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
struct BatchedTestRequest {
|
||||
req: models::PagestreamTestRequest,
|
||||
req: pagestream_api::PagestreamTestRequest,
|
||||
timer: SmgrOpTimer,
|
||||
}
|
||||
|
||||
@@ -726,13 +727,13 @@ enum BatchedFeMessage {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamExistsRequest,
|
||||
req: PagestreamExistsRequest,
|
||||
},
|
||||
Nblocks {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamNblocksRequest,
|
||||
req: PagestreamNblocksRequest,
|
||||
},
|
||||
GetPage {
|
||||
span: Span,
|
||||
@@ -744,13 +745,13 @@ enum BatchedFeMessage {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamDbSizeRequest,
|
||||
req: PagestreamDbSizeRequest,
|
||||
},
|
||||
GetSlruSegment {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamGetSlruSegmentRequest,
|
||||
req: PagestreamGetSlruSegmentRequest,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
Test {
|
||||
@@ -2443,10 +2444,9 @@ impl PageServerHandler {
|
||||
.map(|(req, res)| {
|
||||
res.map(|page| {
|
||||
(
|
||||
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse {
|
||||
req: req.req,
|
||||
page,
|
||||
}),
|
||||
PagestreamBeMessage::GetPage(
|
||||
pagestream_api::PagestreamGetPageResponse { req: req.req, page },
|
||||
),
|
||||
req.timer,
|
||||
req.ctx,
|
||||
)
|
||||
@@ -2513,7 +2513,7 @@ impl PageServerHandler {
|
||||
.map(|(req, res)| {
|
||||
res.map(|()| {
|
||||
(
|
||||
PagestreamBeMessage::Test(models::PagestreamTestResponse {
|
||||
PagestreamBeMessage::Test(pagestream_api::PagestreamTestResponse {
|
||||
req: req.req.clone(),
|
||||
}),
|
||||
req.timer,
|
||||
@@ -3286,7 +3286,14 @@ impl GrpcPageServiceHandler {
|
||||
Ok(req)
|
||||
}))
|
||||
// Run the page service.
|
||||
.service(proto::PageServiceServer::new(page_service_handler));
|
||||
.service(
|
||||
proto::PageServiceServer::new(page_service_handler)
|
||||
// Support both gzip and zstd compression. The client decides what to use.
|
||||
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
|
||||
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
|
||||
.send_compressed(tonic::codec::CompressionEncoding::Gzip)
|
||||
.send_compressed(tonic::codec::CompressionEncoding::Zstd),
|
||||
);
|
||||
let server = server.add_service(page_service);
|
||||
|
||||
// Reflection service for use with e.g. grpcurl.
|
||||
@@ -3532,7 +3539,6 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
// TODO: ensure clients use gzip compression for the stream.
|
||||
#[instrument(skip_all, fields(lsn))]
|
||||
async fn get_base_backup(
|
||||
&self,
|
||||
@@ -3572,9 +3578,6 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
}
|
||||
|
||||
// Spawn a task to run the basebackup.
|
||||
//
|
||||
// TODO: do we need to support full base backups, for debugging? This also requires passing
|
||||
// the prev_lsn parameter.
|
||||
let span = Span::current();
|
||||
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
|
||||
let jh = tokio::spawn(async move {
|
||||
@@ -3583,7 +3586,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
&timeline,
|
||||
req.lsn,
|
||||
None,
|
||||
false,
|
||||
req.full,
|
||||
req.replica,
|
||||
&ctx,
|
||||
)
|
||||
|
||||
@@ -23,12 +23,11 @@ use pageserver_api::key::{
|
||||
};
|
||||
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
||||
use pageserver_api::models::RelSizeMigration;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::{BLCKSZ, Oid, RepOriginId, TimestampTz, TransactionId};
|
||||
use postgres_ffi::{BLCKSZ, TimestampTz, TransactionId};
|
||||
use postgres_ffi_types::forknum::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi_types::{Oid, RepOriginId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -36,6 +35,8 @@ use tracing::{debug, info, info_span, trace, warn};
|
||||
use utils::bin_ser::{BeSer, DeserializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
use wal_decoder::models::value::Value;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use super::tenant::{PageReconstructError, Timeline};
|
||||
@@ -720,6 +721,7 @@ impl Timeline {
|
||||
let batches = keyspace.partition(
|
||||
self.get_shard_identity(),
|
||||
self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
|
||||
BLCKSZ as u64,
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
@@ -960,6 +962,7 @@ impl Timeline {
|
||||
let batches = keyspace.partition(
|
||||
self.get_shard_identity(),
|
||||
self.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
|
||||
BLCKSZ as u64,
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
|
||||
@@ -496,7 +496,7 @@ impl WalRedoManager {
|
||||
key: pageserver_api::key::Key,
|
||||
lsn: Lsn,
|
||||
base_img: Option<(Lsn, bytes::Bytes)>,
|
||||
records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>,
|
||||
records: Vec<(Lsn, wal_decoder::models::record::NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
redo_attempt_type: RedoAttemptType,
|
||||
) -> Result<bytes::Bytes, walredo::Error> {
|
||||
@@ -1859,6 +1859,29 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
// At this point we've initialized all timelines and are tracking them.
|
||||
// Now compute the layer visibility for all (not offloaded) timelines.
|
||||
let compute_visiblity_for = {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let mut timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
|
||||
|
||||
timelines_offloaded_accessor.extend(offloaded_timelines_list.into_iter());
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
|
||||
|
||||
timelines_accessor.values().cloned().collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
for tl in compute_visiblity_for {
|
||||
tl.update_layer_visibility().await.with_context(|| {
|
||||
format!(
|
||||
"failed initial timeline visibility computation {} for tenant {}",
|
||||
tl.timeline_id, self.tenant_shard_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
// Walk through deleted timelines, resume deletion
|
||||
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
|
||||
remote_timeline_client
|
||||
@@ -1878,10 +1901,6 @@ impl TenantShard {
|
||||
.context("resume_deletion")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
}
|
||||
{
|
||||
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
|
||||
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
|
||||
}
|
||||
|
||||
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
|
||||
//
|
||||
@@ -3449,9 +3468,6 @@ impl TenantShard {
|
||||
.values()
|
||||
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
tasks::start_background_loops(self, background_jobs_can_start);
|
||||
@@ -5836,10 +5852,10 @@ pub(crate) mod harness {
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use utils::id::TenantId;
|
||||
use utils::logging;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
use super::*;
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
@@ -6094,9 +6110,6 @@ mod tests {
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::keyspace::KeySpaceRandomAccum;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use pageserver_compaction::helpers::overlaps_with;
|
||||
#[cfg(feature = "testing")]
|
||||
use rand::SeedableRng;
|
||||
@@ -6117,6 +6130,9 @@ mod tests {
|
||||
use timeline::{CompactOptions, DeltaLayerTestDesc, VersionedKeySpaceQuery};
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
#[cfg(feature = "testing")]
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
|
||||
@@ -61,8 +61,10 @@ pub(crate) struct LocationConf {
|
||||
/// The detailed shard identity. This structure is already scoped within
|
||||
/// a TenantShardId, but we need the full ShardIdentity to enable calculating
|
||||
/// key->shard mappings.
|
||||
// TODO(vlad): Remove this default once all configs have a shard identity on disk.
|
||||
#[serde(default = "ShardIdentity::unsharded")]
|
||||
///
|
||||
/// NB: we store this even for unsharded tenants, so that we agree with storcon on the intended
|
||||
/// stripe size. Otherwise, a split request that does not specify a stripe size may use a
|
||||
/// different default than storcon, which can lead to incorrect stripe sizes and corruption.
|
||||
pub(crate) shard: ShardIdentity,
|
||||
|
||||
/// The pan-cluster tenant configuration, the same on all locations
|
||||
|
||||
@@ -34,11 +34,11 @@ pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
use tracing::{Instrument, info_span, trace};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use self::inmemory_layer::InMemoryLayerFileId;
|
||||
use super::PageReconstructError;
|
||||
|
||||
@@ -4,11 +4,11 @@ use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::value::Value;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::layer::S3_UPLOAD_LIMIT;
|
||||
|
||||
@@ -44,7 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
@@ -54,6 +53,7 @@ use utils::bin_ser::BeSer;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::{
|
||||
@@ -1306,7 +1306,7 @@ impl DeltaLayerInner {
|
||||
// is it an image or will_init walrecord?
|
||||
// FIXME: this could be handled by threading the BlobRef to the
|
||||
// VectoredReadBuilder
|
||||
let will_init = pageserver_api::value::ValueBytes::will_init(&data)
|
||||
let will_init = wal_decoder::models::value::ValueBytes::will_init(&data)
|
||||
.inspect_err(|_e| {
|
||||
#[cfg(feature = "testing")]
|
||||
tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
|
||||
@@ -1369,7 +1369,7 @@ impl DeltaLayerInner {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = pageserver_api::record::describe_wal_record(&rec)?;
|
||||
let wal_desc = wal_decoder::models::record::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
@@ -1624,7 +1624,6 @@ pub(crate) mod test {
|
||||
|
||||
use bytes::Bytes;
|
||||
use itertools::MinMaxResult;
|
||||
use pageserver_api::value::Value;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
@@ -1988,7 +1987,7 @@ pub(crate) mod test {
|
||||
#[tokio::test]
|
||||
async fn copy_delta_prefix_smoke() {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
|
||||
.await
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::sync::Arc;
|
||||
use anyhow::bail;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, SparseKeySpace};
|
||||
use pageserver_api::value::Value;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::PersistentLayerKey;
|
||||
use super::merge_iterator::{MergeIterator, MergeIteratorItem};
|
||||
@@ -126,7 +126,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn filter_keyspace_iterator() {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::value::Value;
|
||||
|
||||
let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator")
|
||||
.await
|
||||
|
||||
@@ -42,7 +42,6 @@ use pageserver_api::config::MaxVectoredReadBytes;
|
||||
use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
@@ -52,6 +51,7 @@ use utils::bin_ser::BeSer;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::errors::PutError;
|
||||
use super::layer_name::ImageLayerName;
|
||||
@@ -1232,10 +1232,10 @@ mod test {
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use pageserver_api::value::Value;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::{ImageLayerIterator, ImageLayerWriter};
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
|
||||
@@ -824,7 +824,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn eviction_cancellation_on_drop() {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::value::Value;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
// this is the runtime on which Layer spawns the blocking tasks on
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::value::Value;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::delta_layer::{DeltaLayerInner, DeltaLayerIterator};
|
||||
use super::image_layer::{ImageLayerInner, ImageLayerIterator};
|
||||
@@ -402,9 +402,9 @@ impl<'a> MergeIterator<'a> {
|
||||
mod tests {
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
#[cfg(feature = "testing")]
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use utils::lsn::Lsn;
|
||||
#[cfg(feature = "testing")]
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
@@ -436,7 +436,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn merge_in_between() {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::value::Value;
|
||||
|
||||
let harness = TenantHarness::create("merge_iterator_merge_in_between")
|
||||
.await
|
||||
@@ -501,7 +500,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn delta_merge() {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::value::Value;
|
||||
|
||||
let harness = TenantHarness::create("merge_iterator_delta_merge")
|
||||
.await
|
||||
@@ -578,7 +576,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn delta_image_mixed_merge() {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::value::Value;
|
||||
|
||||
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
|
||||
.await
|
||||
|
||||
@@ -56,8 +56,6 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag};
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, ShardNumber, TenantShardId};
|
||||
#[cfg(test)]
|
||||
use pageserver_api::value::Value;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::v14::xlog_utils;
|
||||
use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
|
||||
@@ -81,6 +79,8 @@ use utils::seqwait::SeqWait;
|
||||
use utils::simple_rcu::{Rcu, RcuReadGuard};
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::{completion, critical, fs_ext, pausable_failpoint};
|
||||
#[cfg(test)]
|
||||
use wal_decoder::models::value::Value;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
use self::delete::DeleteTimelineFlow;
|
||||
@@ -3422,10 +3422,6 @@ impl Timeline {
|
||||
// TenantShard::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
|
||||
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
|
||||
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
|
||||
self.update_layer_visibility().await?;
|
||||
|
||||
info!(
|
||||
"loaded layer map with {} layers at {}, total physical size: {}",
|
||||
num_layers, disk_consistent_lsn, total_physical_size
|
||||
@@ -5211,7 +5207,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
|
||||
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
|
||||
let dense_partitioning = dense_ks.partition(
|
||||
&self.shard_identity,
|
||||
partition_size,
|
||||
postgres_ffi::BLCKSZ as u64,
|
||||
);
|
||||
let sparse_partitioning = SparseKeyPartitioning {
|
||||
parts: vec![sparse_ks],
|
||||
}; // no partitioning for metadata keys for now
|
||||
@@ -5939,7 +5939,7 @@ impl Drop for Timeline {
|
||||
if let Ok(mut gc_info) = ancestor.gc_info.write() {
|
||||
if !gc_info.remove_child_not_offloaded(self.timeline_id) {
|
||||
tracing::error!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id,
|
||||
"Couldn't remove retain_lsn entry from offloaded timeline's parent: already removed");
|
||||
"Couldn't remove retain_lsn entry from timeline's parent on drop: already removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6543,7 +6543,7 @@ impl Timeline {
|
||||
|
||||
debug!("retain_lsns: {:?}", retain_lsns);
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
let max_retain_lsn = retain_lsns.iter().max();
|
||||
|
||||
// Scan all layers in the timeline (remote or on-disk).
|
||||
//
|
||||
@@ -6553,108 +6553,110 @@ impl Timeline {
|
||||
// 3. it doesn't need to be retained for 'retain_lsns';
|
||||
// 4. it does not need to be kept for LSNs holding valid leases.
|
||||
// 5. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
// TODO holding a write lock is too agressive and avoidable
|
||||
let mut guard = self
|
||||
.layers
|
||||
.write(LayerManagerLockHolder::GarbageCollection)
|
||||
.await;
|
||||
let layers = guard.layer_map()?;
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
let layers_to_remove = {
|
||||
let mut layers_to_remove = Vec::new();
|
||||
|
||||
// 1. Is it newer than GC horizon cutoff point?
|
||||
if l.get_lsn_range().end > space_cutoff {
|
||||
info!(
|
||||
"keeping {} because it's newer than space_cutoff {}",
|
||||
l.layer_name(),
|
||||
space_cutoff,
|
||||
);
|
||||
result.layers_needed_by_cutoff += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
let guard = self
|
||||
.layers
|
||||
.read(LayerManagerLockHolder::GarbageCollection)
|
||||
.await;
|
||||
let layers = guard.layer_map()?;
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
// 2. It is newer than PiTR cutoff point?
|
||||
if l.get_lsn_range().end > time_cutoff {
|
||||
info!(
|
||||
"keeping {} because it's newer than time_cutoff {}",
|
||||
l.layer_name(),
|
||||
time_cutoff,
|
||||
);
|
||||
result.layers_needed_by_pitr += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// 3. Is it needed by a child branch?
|
||||
// NOTE With that we would keep data that
|
||||
// might be referenced by child branches forever.
|
||||
// We can track this in child timeline GC and delete parent layers when
|
||||
// they are no longer needed. This might be complicated with long inheritance chains.
|
||||
//
|
||||
// TODO Vec is not a great choice for `retain_lsns`
|
||||
for retain_lsn in &retain_lsns {
|
||||
// start_lsn is inclusive
|
||||
if &l.get_lsn_range().start <= retain_lsn {
|
||||
info!(
|
||||
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
|
||||
// 1. Is it newer than GC horizon cutoff point?
|
||||
if l.get_lsn_range().end > space_cutoff {
|
||||
debug!(
|
||||
"keeping {} because it's newer than space_cutoff {}",
|
||||
l.layer_name(),
|
||||
retain_lsn,
|
||||
l.is_incremental(),
|
||||
space_cutoff,
|
||||
);
|
||||
result.layers_needed_by_branches += 1;
|
||||
result.layers_needed_by_cutoff += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Is there a valid lease that requires us to keep this layer?
|
||||
if let Some(lsn) = &max_lsn_with_valid_lease {
|
||||
// keep if layer start <= any of the lease
|
||||
if &l.get_lsn_range().start <= lsn {
|
||||
info!(
|
||||
"keeping {} because there is a valid lease preventing GC at {}",
|
||||
// 2. It is newer than PiTR cutoff point?
|
||||
if l.get_lsn_range().end > time_cutoff {
|
||||
debug!(
|
||||
"keeping {} because it's newer than time_cutoff {}",
|
||||
l.layer_name(),
|
||||
lsn,
|
||||
time_cutoff,
|
||||
);
|
||||
result.layers_needed_by_leases += 1;
|
||||
result.layers_needed_by_pitr += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// 3. Is it needed by a child branch?
|
||||
// NOTE With that we would keep data that
|
||||
// might be referenced by child branches forever.
|
||||
// We can track this in child timeline GC and delete parent layers when
|
||||
// they are no longer needed. This might be complicated with long inheritance chains.
|
||||
if let Some(retain_lsn) = max_retain_lsn {
|
||||
// start_lsn is inclusive
|
||||
if &l.get_lsn_range().start <= retain_lsn {
|
||||
debug!(
|
||||
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
|
||||
l.layer_name(),
|
||||
retain_lsn,
|
||||
l.is_incremental(),
|
||||
);
|
||||
result.layers_needed_by_branches += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Is there a valid lease that requires us to keep this layer?
|
||||
if let Some(lsn) = &max_lsn_with_valid_lease {
|
||||
// keep if layer start <= any of the lease
|
||||
if &l.get_lsn_range().start <= lsn {
|
||||
debug!(
|
||||
"keeping {} because there is a valid lease preventing GC at {}",
|
||||
l.layer_name(),
|
||||
lsn,
|
||||
);
|
||||
result.layers_needed_by_leases += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Is there a later on-disk layer for this relation?
|
||||
//
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
//
|
||||
// For example, imagine that the following layers exist:
|
||||
//
|
||||
// 1000 - image (A)
|
||||
// 1000-2000 - delta (B)
|
||||
// 2000 - image (C)
|
||||
// 2000-3000 - delta (D)
|
||||
// 3000 - image (E)
|
||||
//
|
||||
// If GC horizon is at 2500, we can remove layers A and B, but
|
||||
// we cannot remove C, even though it's older than 2500, because
|
||||
// the delta layer 2000-3000 depends on it.
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
|
||||
{
|
||||
debug!("keeping {} because it is the latest layer", l.layer_name());
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
info!(
|
||||
"garbage collecting {} is_dropped: xx is_incremental: {}",
|
||||
l.layer_name(),
|
||||
l.is_incremental(),
|
||||
);
|
||||
layers_to_remove.push(l);
|
||||
}
|
||||
|
||||
// 5. Is there a later on-disk layer for this relation?
|
||||
//
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
//
|
||||
// For example, imagine that the following layers exist:
|
||||
//
|
||||
// 1000 - image (A)
|
||||
// 1000-2000 - delta (B)
|
||||
// 2000 - image (C)
|
||||
// 2000-3000 - delta (D)
|
||||
// 3000 - image (E)
|
||||
//
|
||||
// If GC horizon is at 2500, we can remove layers A and B, but
|
||||
// we cannot remove C, even though it's older than 2500, because
|
||||
// the delta layer 2000-3000 depends on it.
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
|
||||
{
|
||||
info!("keeping {} because it is the latest layer", l.layer_name());
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
info!(
|
||||
"garbage collecting {} is_dropped: xx is_incremental: {}",
|
||||
l.layer_name(),
|
||||
l.is_incremental(),
|
||||
);
|
||||
layers_to_remove.push(l);
|
||||
}
|
||||
layers_to_remove
|
||||
};
|
||||
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Persist the new GC cutoff value before we actually remove anything.
|
||||
@@ -6670,15 +6672,19 @@ impl Timeline {
|
||||
}
|
||||
})?;
|
||||
|
||||
let mut guard = self
|
||||
.layers
|
||||
.write(LayerManagerLockHolder::GarbageCollection)
|
||||
.await;
|
||||
|
||||
let gc_layers = layers_to_remove
|
||||
.iter()
|
||||
.map(|x| guard.get_from_desc(x))
|
||||
.flat_map(|desc| guard.try_get_from_key(&desc.key()).cloned())
|
||||
.collect::<Vec<Layer>>();
|
||||
|
||||
result.layers_removed = gc_layers.len() as u64;
|
||||
|
||||
self.remote_client.schedule_gc_update(&gc_layers)?;
|
||||
|
||||
guard.open_mut()?.finish_gc_timeline(&gc_layers);
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -7594,11 +7600,11 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::value::Value;
|
||||
use std::iter::Iterator;
|
||||
use tracing::Instrument;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use super::HeatMapTimeline;
|
||||
use crate::context::RequestContextBuilder;
|
||||
|
||||
@@ -29,9 +29,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::{KeySpace, ShardedRange};
|
||||
use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
|
||||
use pageserver_compaction::interface::*;
|
||||
use serde::Serialize;
|
||||
@@ -41,6 +39,8 @@ use tracing::{Instrument, debug, error, info, info_span, trace, warn};
|
||||
use utils::critical;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
use wal_decoder::models::value::Value;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
|
||||
@@ -36,8 +36,8 @@ use pageserver_api::keyspace::{ShardedRange, singleton_range};
|
||||
use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::relfile_utils::parse_relfilename;
|
||||
use postgres_ffi::{BLCKSZ, pg_constants};
|
||||
use remote_storage::RemotePath;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_stream::StreamExt;
|
||||
@@ -558,7 +558,7 @@ impl PgDataDir {
|
||||
PgDataDirDb::new(
|
||||
storage,
|
||||
&basedir.join(dboid.to_string()),
|
||||
pg_constants::DEFAULTTABLESPACE_OID,
|
||||
postgres_ffi_types::constants::DEFAULTTABLESPACE_OID,
|
||||
dboid,
|
||||
&datadir_path,
|
||||
)
|
||||
@@ -571,7 +571,7 @@ impl PgDataDir {
|
||||
PgDataDirDb::new(
|
||||
storage,
|
||||
&datadir_path.join("global"),
|
||||
postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
|
||||
postgres_ffi_types::constants::GLOBALTABLESPACE_OID,
|
||||
0,
|
||||
&datadir_path,
|
||||
)
|
||||
|
||||
@@ -28,20 +28,20 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver_api::key::{Key, rel_block_to_key};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::walrecord::*;
|
||||
use postgres_ffi::{
|
||||
TimestampTz, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
|
||||
fsm_logical_to_physical, pg_constants,
|
||||
};
|
||||
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use tracing::*;
|
||||
use utils::bin_ser::{DeserializeError, SerializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::{critical, failpoint_support};
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
use wal_decoder::models::*;
|
||||
|
||||
use crate::ZERO_PAGE;
|
||||
|
||||
@@ -32,12 +32,12 @@ use anyhow::Context;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateError;
|
||||
use utils::sync::heavier_once_cell;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{
|
||||
@@ -571,11 +571,11 @@ mod tests {
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tracing::Instrument;
|
||||
use utils::id::TenantId;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
use super::PostgresRedoManager;
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
@@ -2,16 +2,16 @@ use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
|
||||
use postgres_ffi::v14::nonrelfile_utils::{
|
||||
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
|
||||
transaction_id_set_status,
|
||||
};
|
||||
use postgres_ffi::{BLCKSZ, pg_constants};
|
||||
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
/// or we need to pass it to wal-redo postgres process?
|
||||
|
||||
@@ -10,7 +10,6 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
@@ -18,6 +17,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{Instrument, debug, error, instrument};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::poison::Poison;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
|
||||
use self::no_leak_child::NoLeakChild;
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
28
pgxn/Makefile
Normal file
28
pgxn/Makefile
Normal file
@@ -0,0 +1,28 @@
|
||||
# This makefile assumes that 'pg_config' is in the path, or is passed in the
|
||||
# PG_CONFIG variable.
|
||||
#
|
||||
# This is used in two different ways:
|
||||
#
|
||||
# 1. The main makefile calls this, when you invoke the `make neon-pg-ext-%`
|
||||
# target. It passes PG_CONFIG pointing to pg_install/%/bin/pg_config.
|
||||
# This is a VPATH build; the current directory is build/pgxn-%, and
|
||||
# the path to the Makefile is passed with the -f argument.
|
||||
#
|
||||
# 2. compute-node.Dockerfile invokes this to build the compute extensions
|
||||
# for the specific Postgres version. It relies on pg_config already
|
||||
# being in $(PATH).
|
||||
|
||||
srcdir = $(dir $(firstword $(MAKEFILE_LIST)))
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
|
||||
subdirs = neon neon_rmgr neon_walredo neon_utils neon_test_utils
|
||||
|
||||
.PHONY: install install-compute install-storage $(subdirs)
|
||||
install: $(subdirs)
|
||||
install-compute: neon neon_utils neon_test_utils neon_rmgr
|
||||
install-storage: neon_rmgr neon_walredo
|
||||
|
||||
$(subdirs): %:
|
||||
mkdir -p $*
|
||||
$(MAKE) PG_CONFIG=$(PG_CONFIG) -C $* -f $(abspath $(srcdir)/$@/Makefile) install
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon extension
|
||||
comment = 'cloud storage for PostgreSQL'
|
||||
default_version = '1.5'
|
||||
default_version = '1.6'
|
||||
module_pathname = '$libdir/neon'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user