Compare commits

..

3 Commits

Author SHA1 Message Date
Conrad Ludgate
edea436191 only one client, so only one channel pair 2025-05-21 22:54:56 +01:00
Conrad Ludgate
4b0c7f9530 remove arc around inner client 2025-05-21 21:58:03 +01:00
Conrad Ludgate
ab61864df1 proxy(tokio-postgres): move statement cleanup to client drop 2025-05-21 21:53:57 +01:00
51 changed files with 826 additions and 2319 deletions

View File

@@ -314,8 +314,7 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ github.ref_name == 'main' }}
# test_pageserver_max_throughput_getpage_at_latest_lsn is run in separate workflow periodic_pagebench.yml because it needs snapshots
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }} --ignore=test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
pg_version: v16
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}

View File

@@ -1,4 +1,4 @@
name: Periodic pagebench performance test on unit-perf hetzner runner
name: Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
on:
schedule:
@@ -8,7 +8,7 @@ on:
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 */4 * * *' # Runs every 4 hours
- cron: '0 */3 * * *' # Runs every 3 hours
workflow_dispatch: # Allows manual triggering of the workflow
inputs:
commit_hash:
@@ -16,11 +16,6 @@ on:
description: 'The long neon repo commit hash for the system under test (pageserver) to be tested.'
required: false
default: ''
recreate_snapshots:
type: boolean
description: 'Recreate snapshots - !!!WARNING!!! We should only recreate snapshots if the previous ones are no longer compatible. Otherwise benchmarking results are not comparable across runs.'
required: false
default: false
defaults:
run:
@@ -34,13 +29,13 @@ permissions:
contents: read
jobs:
run_periodic_pagebench_test:
trigger_bench_on_ec2_machine_in_eu_central_1:
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
runs-on: [ self-hosted, unit-perf ]
runs-on: [ self-hosted, small ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
@@ -49,13 +44,10 @@ jobs:
options: --init
timeout-minutes: 360 # Set the timeout to 6 hours
env:
API_KEY: ${{ secrets.PERIODIC_PAGEBENCH_EC2_RUNNER_API_KEY }}
RUN_ID: ${{ github.run_id }}
DEFAULT_PG_VERSION: 16
BUILD_TYPE: release
RUST_BACKTRACE: 1
# NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS: 1 - doesn't work without root in container
S3_BUCKET: neon-github-public-dev
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
AWS_DEFAULT_REGION : "eu-central-1"
AWS_INSTANCE_ID : "i-02a59a3bf86bc7e74"
steps:
# we don't need the neon source code because we run everything remotely
# however we still need the local github actions to run the allure step below
@@ -64,194 +56,99 @@ jobs:
with:
egress-policy: audit
- name: Set up the environment which depends on $RUNNER_TEMP on nvme drive
id: set-env
shell: bash -euxo pipefail {0}
run: |
{
echo "NEON_DIR=${RUNNER_TEMP}/neon"
echo "NEON_BIN=${RUNNER_TEMP}/neon/bin"
echo "POSTGRES_DISTRIB_DIR=${RUNNER_TEMP}/neon/pg_install"
echo "LD_LIBRARY_PATH=${RUNNER_TEMP}/neon/pg_install/v${DEFAULT_PG_VERSION}/lib"
echo "BACKUP_DIR=${RUNNER_TEMP}/instance_store/saved_snapshots"
echo "TEST_OUTPUT=${RUNNER_TEMP}/neon/test_output"
echo "PERF_REPORT_DIR=${RUNNER_TEMP}/neon/test_output/perf-report-local"
echo "ALLURE_DIR=${RUNNER_TEMP}/neon/test_output/allure-results"
echo "ALLURE_RESULTS_DIR=${RUNNER_TEMP}/neon/test_output/allure-results/results"
} >> "$GITHUB_ENV"
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
echo "allure_results_dir=${RUNNER_TEMP}/neon/test_output/allure-results/results" >> "$GITHUB_OUTPUT"
- name: Show my own (github runner) external IP address - usefull for IP allowlisting
run: curl https://ifconfig.me
- uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
- name: Assume AWS OIDC role that allows to manage (start/stop/describe... EC machine)
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # max 5 hours (needed in case commit hash is still being built)
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN }}
role-duration-seconds: 3600
- name: Start EC2 instance and wait for the instance to boot up
run: |
aws ec2 start-instances --instance-ids $AWS_INSTANCE_ID
aws ec2 wait instance-running --instance-ids $AWS_INSTANCE_ID
sleep 60 # sleep some time to allow cloudinit and our API server to start up
- name: Determine public IP of the EC2 instance and set env variable EC2_MACHINE_URL_US
run: |
public_ip=$(aws ec2 describe-instances --instance-ids $AWS_INSTANCE_ID --query 'Reservations[*].Instances[*].PublicIpAddress' --output text)
echo "Public IP of the EC2 instance: $public_ip"
echo "EC2_MACHINE_URL_US=https://${public_ip}:8443" >> $GITHUB_ENV
- name: Determine commit hash
id: commit_hash
shell: bash -euxo pipefail {0}
env:
INPUT_COMMIT_HASH: ${{ github.event.inputs.commit_hash }}
run: |
if [[ -z "${INPUT_COMMIT_HASH}" ]]; then
COMMIT_HASH=$(curl -s https://api.github.com/repos/neondatabase/neon/commits/main | jq -r '.sha')
echo "COMMIT_HASH=$COMMIT_HASH" >> $GITHUB_ENV
echo "commit_hash=$COMMIT_HASH" >> "$GITHUB_OUTPUT"
if [ -z "$INPUT_COMMIT_HASH" ]; then
echo "COMMIT_HASH=$(curl -s https://api.github.com/repos/neondatabase/neon/commits/main | jq -r '.sha')" >> $GITHUB_ENV
echo "COMMIT_HASH_TYPE=latest" >> $GITHUB_ENV
else
COMMIT_HASH="${INPUT_COMMIT_HASH}"
echo "COMMIT_HASH=$COMMIT_HASH" >> $GITHUB_ENV
echo "commit_hash=$COMMIT_HASH" >> "$GITHUB_OUTPUT"
echo "COMMIT_HASH=$INPUT_COMMIT_HASH" >> $GITHUB_ENV
echo "COMMIT_HASH_TYPE=manual" >> $GITHUB_ENV
fi
- name: Checkout the neon repository at given commit hash
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
ref: ${{ steps.commit_hash.outputs.commit_hash }}
# does not reuse ./.github/actions/download because we need to download the artifact for the given commit hash
# example artifact
# s3://neon-github-public-dev/artifacts/48b870bc078bd2c450eb7b468e743b9c118549bf/15036827400/1/neon-Linux-X64-release-artifact.tar.zst /instance_store/artifacts/neon-Linux-release-artifact.tar.zst
- name: Determine artifact S3_KEY for given commit hash and download and extract artifact
id: artifact_prefix
shell: bash -euxo pipefail {0}
env:
ARCHIVE: ${{ runner.temp }}/downloads/neon-${{ runner.os }}-${{ runner.arch }}-release-artifact.tar.zst
COMMIT_HASH: ${{ env.COMMIT_HASH }}
COMMIT_HASH_TYPE: ${{ env.COMMIT_HASH_TYPE }}
- name: Start Bench with run_id
run: |
attempt=0
max_attempts=24 # 5 minutes * 24 = 2 hours
curl -k -X 'POST' \
"${EC2_MACHINE_URL_US}/start_test/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer $API_KEY" \
-d "{\"neonRepoCommitHash\": \"${COMMIT_HASH}\", \"neonRepoCommitHashType\": \"${COMMIT_HASH_TYPE}\"}"
while [[ $attempt -lt $max_attempts ]]; do
# the following command will fail until the artifacts are available ...
S3_KEY=$(aws s3api list-objects-v2 --bucket "$S3_BUCKET" --prefix "artifacts/$COMMIT_HASH/" \
| jq -r '.Contents[]?.Key' \
| grep "neon-${{ runner.os }}-${{ runner.arch }}-release-artifact.tar.zst" \
| sort --version-sort \
| tail -1) || true # ... thus ignore errors from the command
if [[ -n "${S3_KEY}" ]]; then
echo "Artifact found: $S3_KEY"
echo "S3_KEY=$S3_KEY" >> $GITHUB_ENV
- name: Poll Test Status
id: poll_step
run: |
status=""
while [[ "$status" != "failure" && "$status" != "success" ]]; do
response=$(curl -k -X 'GET' \
"${EC2_MACHINE_URL_US}/test_status/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H "Authorization: Bearer $API_KEY")
echo "Response: $response"
set +x
status=$(echo $response | jq -r '.status')
echo "Test status: $status"
if [[ "$status" == "failure" ]]; then
echo "Test failed"
exit 1 # Fail the job step if status is failure
elif [[ "$status" == "success" || "$status" == "null" ]]; then
break
elif [[ "$status" == "too_many_runs" ]]; then
echo "Too many runs already running"
echo "too_many_runs=true" >> "$GITHUB_OUTPUT"
exit 1
fi
# Increment attempt counter and sleep for 5 minutes
attempt=$((attempt + 1))
echo "Attempt $attempt of $max_attempts to find artifacts in S3 bucket s3://$S3_BUCKET/artifacts/$COMMIT_HASH failed. Retrying in 5 minutes..."
sleep 300 # Sleep for 5 minutes
sleep 60 # Poll every 60 seconds
done
if [[ -z "${S3_KEY}" ]]; then
echo "Error: artifact not found in S3 bucket s3://$S3_BUCKET/artifacts/$COMMIT_HASH" after 2 hours
else
mkdir -p $(dirname $ARCHIVE)
time aws s3 cp --only-show-errors s3://$S3_BUCKET/${S3_KEY} ${ARCHIVE}
mkdir -p ${NEON_DIR}
time tar -xf ${ARCHIVE} -C ${NEON_DIR}
rm -f ${ARCHIVE}
fi
- name: Download snapshots from S3
if: ${{ github.event_name != 'workflow_dispatch' || github.event.inputs.recreate_snapshots == 'false' || github.event.inputs.recreate_snapshots == '' }}
id: download_snapshots
shell: bash -euxo pipefail {0}
- name: Retrieve Test Logs
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
# Download the snapshots from S3
mkdir -p ${TEST_OUTPUT}
mkdir -p $BACKUP_DIR
cd $BACKUP_DIR
mkdir parts
cd parts
PART=$(aws s3api list-objects-v2 --bucket $S3_BUCKET --prefix performance/pagebench/ \
| jq -r '.Contents[]?.Key' \
| grep -E 'shared-snapshots-[0-9]{4}-[0-9]{2}-[0-9]{2}' \
| sort \
| tail -1)
echo "Latest PART: $PART"
if [[ -z "$PART" ]]; then
echo "ERROR: No matching S3 key found" >&2
exit 1
fi
S3_KEY=$(dirname $PART)
time aws s3 cp --only-show-errors --recursive s3://${S3_BUCKET}/$S3_KEY/ .
cd $TEST_OUTPUT
time cat $BACKUP_DIR/parts/* | zstdcat | tar --extract --preserve-permissions
rm -rf ${BACKUP_DIR}
curl -k -X 'GET' \
"${EC2_MACHINE_URL_US}/test_log/${GITHUB_RUN_ID}" \
-H 'accept: application/gzip' \
-H "Authorization: Bearer $API_KEY" \
--output "test_log_${GITHUB_RUN_ID}.gz"
- name: Cache poetry deps
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
# we need high number of open files for pagebench
- name: show ulimits
shell: bash -euxo pipefail {0}
- name: Unzip Test Log and Print it into this job's log
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
ulimit -a
- name: Run pagebench testcase
shell: bash -euxo pipefail {0}
env:
CI: false # need to override this env variable set by github to enforce using snapshots
run: |
export PLATFORM=hetzner-unit-perf-${COMMIT_HASH_TYPE}
# report the commit hash of the neon repository in the revision of the test results
export GITHUB_SHA=${COMMIT_HASH}
rm -rf ${PERF_REPORT_DIR}
rm -rf ${ALLURE_RESULTS_DIR}
mkdir -p ${PERF_REPORT_DIR}
mkdir -p ${ALLURE_RESULTS_DIR}
PARAMS="--alluredir=${ALLURE_RESULTS_DIR} --tb=short --verbose -rA"
EXTRA_PARAMS="--out-dir ${PERF_REPORT_DIR} --durations-path $TEST_OUTPUT/benchmark_durations.json"
# run only two selected tests
# environment set by parent:
# RUST_BACKTRACE=1 DEFAULT_PG_VERSION=16 BUILD_TYPE=release
./scripts/pytest ${PARAMS} test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_characterize_throughput_with_n_tenants ${EXTRA_PARAMS}
./scripts/pytest ${PARAMS} test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_characterize_latencies_with_1_client_and_throughput_with_many_clients_one_tenant ${EXTRA_PARAMS}
- name: upload the performance metrics to the Neon performance database which is used by grafana dashboards to display the results
shell: bash -euxo pipefail {0}
run: |
export REPORT_FROM="$PERF_REPORT_DIR"
export GITHUB_SHA=${COMMIT_HASH}
time ./scripts/generate_and_push_perf_report.sh
- name: Upload test results
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-store
with:
report-dir: ${{ steps.set-env.outputs.allure_results_dir }}
unique-key: ${{ env.BUILD_TYPE }}-${{ env.DEFAULT_PG_VERSION }}-${{ runner.arch }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
gzip -d "test_log_${GITHUB_RUN_ID}.gz"
cat "test_log_${GITHUB_RUN_ID}"
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Upload snapshots
if: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.recreate_snapshots != 'false' && github.event.inputs.recreate_snapshots != '' }}
id: upload_snapshots
shell: bash -euxo pipefail {0}
run: |
mkdir -p $BACKUP_DIR
cd $TEST_OUTPUT
tar --create --preserve-permissions --file - shared-snapshots | zstd -o $BACKUP_DIR/shared_snapshots.tar.zst
cd $BACKUP_DIR
mkdir parts
split -b 1G shared_snapshots.tar.zst ./parts/shared_snapshots.tar.zst.part.
SNAPSHOT_DATE=$(date +%F) # YYYY-MM-DD
cd parts
time aws s3 cp --recursive . s3://${S3_BUCKET}/performance/pagebench/shared-snapshots-${SNAPSHOT_DATE}/
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
@@ -260,22 +157,26 @@ jobs:
slack-message: "Periodic pagebench testing on dedicated hardware: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
- name: Cleanup Test Resources
if: always()
shell: bash -euxo pipefail {0}
env:
ARCHIVE: ${{ runner.temp }}/downloads/neon-${{ runner.os }}-${{ runner.arch }}-release-artifact.tar.zst
run: |
# Cleanup the test resources
if [[ -d "${BACKUP_DIR}" ]]; then
rm -rf ${BACKUP_DIR}
fi
if [[ -d "${TEST_OUTPUT}" ]]; then
rm -rf ${TEST_OUTPUT}
fi
if [[ -d "${NEON_DIR}" ]]; then
rm -rf ${NEON_DIR}
fi
rm -rf $(dirname $ARCHIVE)
curl -k -X 'POST' \
"${EC2_MACHINE_URL_US}/cleanup_test/${GITHUB_RUN_ID}" \
-H 'accept: application/json' \
-H "Authorization: Bearer $API_KEY" \
-d ''
- name: Assume AWS OIDC role that allows to manage (start/stop/describe... EC machine)
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_MANAGE_BENCHMARK_EC2_VMS_ARN }}
role-duration-seconds: 3600
- name: Stop EC2 instance and wait for the instance to be stopped
if: always() && steps.poll_step.outputs.too_many_runs != 'true'
run: |
aws ec2 stop-instances --instance-ids $AWS_INSTANCE_ID
aws ec2 wait instance-stopped --instance-ids $AWS_INSTANCE_ID

195
Cargo.lock generated
View File

@@ -1276,7 +1276,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"indexmap 2.9.0",
"indexmap 2.0.1",
"jsonwebtoken",
"regex",
"remote_storage",
@@ -1308,7 +1308,7 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"indexmap 2.9.0",
"indexmap 2.0.1",
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
@@ -2597,7 +2597,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.9",
"indexmap 2.9.0",
"indexmap 2.0.1",
"slab",
"tokio",
"tokio-util",
@@ -2616,7 +2616,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 1.1.0",
"indexmap 2.9.0",
"indexmap 2.0.1",
"slab",
"tokio",
"tokio-util",
@@ -2863,14 +2863,14 @@ dependencies = [
"pprof",
"regex",
"routerify",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-pemfile 2.1.1",
"serde",
"serde_json",
"serde_path_to_error",
"thiserror 1.0.69",
"tokio",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-stream",
"tokio-util",
"tracing",
@@ -3200,12 +3200,12 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.9.0"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
checksum = "ad227c3af19d4914570ad36d30409928b75967c298feb9ea1969db3a610bb14e"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
"hashbrown 0.14.5",
"serde",
]
@@ -3228,7 +3228,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash",
"indexmap 2.9.0",
"indexmap 2.0.1",
"is-terminal",
"itoa",
"log",
@@ -3251,7 +3251,7 @@ dependencies = [
"crossbeam-utils",
"dashmap 6.1.0",
"env_logger",
"indexmap 2.9.0",
"indexmap 2.0.1",
"itoa",
"log",
"num-format",
@@ -4112,7 +4112,7 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost 0.13.5",
"prost 0.13.3",
"reqwest",
"thiserror 1.0.69",
]
@@ -4125,8 +4125,8 @@ checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost 0.13.5",
"tonic 0.12.3",
"prost 0.13.3",
"tonic",
]
[[package]]
@@ -4321,7 +4321,6 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"pageserver_compaction",
"pageserver_page_api",
"pem",
"pin-project-lite",
"postgres-protocol",
@@ -4340,7 +4339,7 @@ dependencies = [
"reqwest",
"rpds",
"rstest",
"rustls 0.23.27",
"rustls 0.23.18",
"scopeguard",
"send-future",
"serde",
@@ -4359,13 +4358,11 @@ dependencies = [
"tokio-epoll-uring",
"tokio-io-timeout",
"tokio-postgres",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"tonic 0.13.1",
"tonic-reflection",
"tracing",
"tracing-utils",
"twox-hash",
@@ -4458,8 +4455,8 @@ dependencies = [
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"prost 0.13.5",
"tonic 0.13.1",
"prost 0.13.3",
"tonic",
"tonic-build",
"workspace_hack",
]
@@ -4840,14 +4837,14 @@ dependencies = [
"bytes",
"once_cell",
"pq_proto",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-pemfile 2.1.1",
"serde",
"thiserror 1.0.69",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-util",
"tracing",
]
@@ -4954,7 +4951,7 @@ dependencies = [
"inferno 0.12.0",
"num",
"paste",
"prost 0.13.5",
"prost 0.13.3",
]
[[package]]
@@ -5059,12 +5056,12 @@ dependencies = [
[[package]]
name = "prost"
version = "0.13.5"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f"
dependencies = [
"bytes",
"prost-derive 0.13.5",
"prost-derive 0.13.3",
]
[[package]]
@@ -5102,7 +5099,7 @@ dependencies = [
"once_cell",
"petgraph",
"prettyplease",
"prost 0.13.5",
"prost 0.13.3",
"prost-types 0.13.3",
"regex",
"syn 2.0.100",
@@ -5124,9 +5121,9 @@ dependencies = [
[[package]]
name = "prost-derive"
version = "0.13.5"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5"
dependencies = [
"anyhow",
"itertools 0.12.1",
@@ -5150,7 +5147,7 @@ version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670"
dependencies = [
"prost 0.13.5",
"prost 0.13.3",
]
[[package]]
@@ -5198,7 +5195,7 @@ dependencies = [
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"indexmap 2.0.1",
"ipnet",
"itertools 0.10.5",
"itoa",
@@ -5232,7 +5229,7 @@ dependencies = [
"rsa",
"rstest",
"rustc-hash 1.1.0",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
"scopeguard",
@@ -5251,7 +5248,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-postgres2",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-tungstenite 0.21.0",
"tokio-util",
"tracing",
@@ -5475,13 +5472,13 @@ dependencies = [
"num-bigint",
"percent-encoding",
"pin-project-lite",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"ryu",
"sha1_smol",
"socket2",
"tokio",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-util",
"url",
]
@@ -5929,15 +5926,15 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.27"
version = "0.23.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321"
checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki 0.103.3",
"rustls-webpki 0.102.8",
"subtle",
"zeroize",
]
@@ -6026,17 +6023,6 @@ dependencies = [
"untrusted",
]
[[package]]
name = "rustls-webpki"
version = "0.103.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
name = "rustversion"
version = "1.0.12"
@@ -6088,7 +6074,7 @@ dependencies = [
"regex",
"remote_storage",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.18",
"safekeeper_api",
"safekeeper_client",
"scopeguard",
@@ -6105,7 +6091,7 @@ dependencies = [
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-stream",
"tokio-tar",
"tokio-util",
@@ -6277,7 +6263,7 @@ checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335"
dependencies = [
"httpdate",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.18",
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
@@ -6706,11 +6692,11 @@ dependencies = [
"metrics",
"once_cell",
"parking_lot 0.12.1",
"prost 0.13.5",
"rustls 0.23.27",
"prost 0.13.3",
"rustls 0.23.18",
"tokio",
"tokio-rustls 0.26.2",
"tonic 0.13.1",
"tokio-rustls 0.26.0",
"tonic",
"tonic-build",
"tracing",
"utils",
@@ -6752,7 +6738,7 @@ dependencies = [
"regex",
"reqwest",
"routerify",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"safekeeper_api",
"safekeeper_client",
@@ -6767,7 +6753,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-util",
"tracing",
"utils",
@@ -6805,7 +6791,7 @@ dependencies = [
"postgres_ffi",
"remote_storage",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"serde",
"serde_json",
@@ -7339,10 +7325,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab"
dependencies = [
"ring",
"rustls 0.23.27",
"rustls 0.23.18",
"tokio",
"tokio-postgres",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"x509-certificate",
]
@@ -7386,11 +7372,12 @@ dependencies = [
[[package]]
name = "tokio-rustls"
version = "0.26.2"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-pki-types",
"tokio",
]
@@ -7488,7 +7475,7 @@ version = "0.22.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38"
dependencies = [
"indexmap 2.9.0",
"indexmap 2.0.1",
"serde",
"serde_spanned",
"toml_datetime",
@@ -7507,41 +7494,18 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"percent-encoding",
"pin-project",
"prost 0.13.5",
"tokio-stream",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"axum",
"base64 0.22.1",
"bytes",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost 0.13.5",
"prost 0.13.3",
"rustls-native-certs 0.8.0",
"socket2",
"rustls-pemfile 2.1.1",
"tokio",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-stream",
"tower 0.5.2",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
@@ -7549,9 +7513,9 @@ dependencies = [
[[package]]
name = "tonic-build"
version = "0.13.1"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac6f67be712d12f0b41328db3137e0d0757645d8904b4cb7d51cd9c2279e847"
checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11"
dependencies = [
"prettyplease",
"proc-macro2",
@@ -7561,19 +7525,6 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "tonic-reflection"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.3",
"tokio",
"tokio-stream",
"tonic 0.13.1",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -7582,11 +7533,16 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap 1.9.3",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -7597,12 +7553,9 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
"indexmap 2.9.0",
"pin-project-lite",
"slab",
"sync_wrapper 1.0.1",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
@@ -7930,7 +7883,7 @@ dependencies = [
"base64 0.22.1",
"log",
"once_cell",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-pki-types",
"url",
"webpki-roots",
@@ -8125,7 +8078,7 @@ dependencies = [
"pageserver_api",
"postgres_ffi",
"pprof",
"prost 0.13.5",
"prost 0.13.3",
"remote_storage",
"serde",
"serde_json",
@@ -8545,8 +8498,6 @@ dependencies = [
"ahash",
"anstream",
"anyhow",
"axum",
"axum-core",
"base64 0.13.1",
"base64 0.21.7",
"base64ct",
@@ -8583,7 +8534,8 @@ dependencies = [
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"indexmap 1.9.3",
"indexmap 2.0.1",
"itertools 0.12.1",
"lazy_static",
"libc",
@@ -8605,16 +8557,16 @@ dependencies = [
"percent-encoding",
"prettyplease",
"proc-macro2",
"prost 0.13.5",
"prost 0.13.3",
"quote",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"reqwest",
"rustls 0.23.27",
"rustls 0.23.18",
"rustls-pki-types",
"rustls-webpki 0.103.3",
"rustls-webpki 0.102.8",
"scopeguard",
"sec1 0.7.3",
"serde",
@@ -8632,11 +8584,12 @@ dependencies = [
"time",
"time-macros",
"tokio",
"tokio-rustls 0.26.2",
"tokio-rustls 0.26.0",
"tokio-stream",
"tokio-util",
"toml_edit",
"tower 0.5.2",
"tonic",
"tower 0.4.13",
"tracing",
"tracing-core",
"tracing-log",

View File

@@ -149,7 +149,7 @@ pin-project-lite = "0.2"
pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointer", "prost-codec"] }
procfs = "0.16"
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
prost = "0.13.5"
prost = "0.13"
rand = "0.8"
redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] }
regex = "1.10.2"
@@ -199,8 +199,7 @@ tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
tonic-reflection = { version = "0.13.1", features = ["server"] }
tonic = {version = "0.12.3", default-features = false, features = ["channel", "tls", "tls-roots"]}
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
@@ -281,7 +280,7 @@ criterion = "0.5.1"
rcgen = "0.13"
rstest = "0.18"
camino-tempfile = "1.0.2"
tonic-build = "0.13.1"
tonic-build = "0.12"
[patch.crates-io]

View File

@@ -155,7 +155,7 @@ RUN set -e \
# Keep the version the same as in compute/compute-node.Dockerfile and
# test_runner/regress/test_compute_metrics.py.
ENV SQL_EXPORTER_VERSION=0.17.3
ENV SQL_EXPORTER_VERSION=0.17.0
RUN curl -fsSL \
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
--output sql_exporter.tar.gz \

View File

@@ -582,38 +582,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/hypopg.control
#########################################################################################
#
# Layer "online_advisor-build"
# compile online_advisor extension
#
#########################################################################################
FROM build-deps AS online_advisor-src
ARG PG_VERSION
# online_advisor supports all Postgres version starting from PG14, but prior to PG17 has to be included in preload_shared_libraries
# last release 1.0 - May 15, 2025
WORKDIR /ext-src
RUN case "${PG_VERSION:?}" in \
"v17") \
;; \
*) \
echo "skipping the version of online_advistor for $PG_VERSION" && exit 0 \
;; \
esac && \
wget https://github.com/knizhnik/online_advisor/archive/refs/tags/1.0.tar.gz -O online_advisor.tar.gz && \
echo "059b7d9e5a90013a58bdd22e9505b88406ce05790675eb2d8434e5b215652d54 online_advisor.tar.gz" | sha256sum --check && \
mkdir online_advisor-src && cd online_advisor-src && tar xzf ../online_advisor.tar.gz --strip-components=1 -C .
FROM pg-build AS online_advisor-build
COPY --from=online_advisor-src /ext-src/ /ext-src/
WORKDIR /ext-src/
RUN if [ -d online_advisor-src ]; then \
cd online_advisor-src && \
make -j install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/online_advisor.control; \
fi
#########################################################################################
#
# Layer "pg_hashids-build"
@@ -1680,7 +1648,6 @@ COPY --from=pg_jsonschema-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_graphql-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_tiktoken-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hypopg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=online_advisor-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_hashids-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rum-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgtap-build /usr/local/pgsql/ /usr/local/pgsql/
@@ -1784,17 +1751,17 @@ ARG TARGETARCH
RUN if [ "$TARGETARCH" = "amd64" ]; then\
postgres_exporter_sha256='59aa4a7bb0f7d361f5e05732f5ed8c03cc08f78449cef5856eadec33a627694b';\
pgbouncer_exporter_sha256='c9f7cf8dcff44f0472057e9bf52613d93f3ffbc381ad7547a959daa63c5e84ac';\
sql_exporter_sha256='9a41127a493e8bfebfe692bf78c7ed2872a58a3f961ee534d1b0da9ae584aaab';\
sql_exporter_sha256='38e439732bbf6e28ca4a94d7bc3686d3fa1abdb0050773d5617a9efdb9e64d08';\
else\
postgres_exporter_sha256='d1dedea97f56c6d965837bfd1fbb3e35a3b4a4556f8cccee8bd513d8ee086124';\
pgbouncer_exporter_sha256='217c4afd7e6492ae904055bc14fe603552cf9bac458c063407e991d68c519da3';\
sql_exporter_sha256='530e6afc77c043497ed965532c4c9dfa873bc2a4f0b3047fad367715c0081d6a';\
sql_exporter_sha256='11918b00be6e2c3a67564adfdb2414fdcbb15a5db76ea17d1d1a944237a893c6';\
fi\
&& curl -sL https://github.com/prometheus-community/postgres_exporter/releases/download/v0.17.1/postgres_exporter-0.17.1.linux-${TARGETARCH}.tar.gz\
| tar xzf - --strip-components=1 -C.\
&& curl -sL https://github.com/prometheus-community/pgbouncer_exporter/releases/download/v0.10.2/pgbouncer_exporter-0.10.2.linux-${TARGETARCH}.tar.gz\
| tar xzf - --strip-components=1 -C.\
&& curl -sL https://github.com/burningalchemist/sql_exporter/releases/download/0.17.3/sql_exporter-0.17.3.linux-${TARGETARCH}.tar.gz\
&& curl -sL https://github.com/burningalchemist/sql_exporter/releases/download/0.17.0/sql_exporter-0.17.0.linux-${TARGETARCH}.tar.gz\
| tar xzf - --strip-components=1 -C.\
&& echo "${postgres_exporter_sha256} postgres_exporter" | sha256sum -c -\
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
@@ -1856,7 +1823,6 @@ COPY --from=pgjwt-src /ext-src/ /ext-src/
COPY --from=pg_graphql-src /ext-src/ /ext-src/
#COPY --from=pg_tiktoken-src /ext-src/ /ext-src/
COPY --from=hypopg-src /ext-src/ /ext-src/
COPY --from=online_advisor-src /ext-src/ /ext-src/
COPY --from=pg_hashids-src /ext-src/ /ext-src/
COPY --from=rum-src /ext-src/ /ext-src/
COPY --from=pgtap-src /ext-src/ /ext-src/

View File

@@ -27,40 +27,6 @@ fn get_rsyslog_pid() -> Option<String> {
}
}
fn wait_for_rsyslog_pid() -> Result<String, anyhow::Error> {
const MAX_WAIT: Duration = Duration::from_secs(5);
const INITIAL_SLEEP: Duration = Duration::from_millis(2);
let mut sleep_duration = INITIAL_SLEEP;
let start = std::time::Instant::now();
let mut attempts = 1;
for attempt in 1.. {
attempts = attempt;
match get_rsyslog_pid() {
Some(pid) => return Ok(pid),
None => {
if start.elapsed() >= MAX_WAIT {
break;
}
info!(
"rsyslogd is not running, attempt {}. Sleeping for {} ms",
attempt,
sleep_duration.as_millis()
);
std::thread::sleep(sleep_duration);
sleep_duration *= 2;
}
}
}
Err(anyhow::anyhow!(
"rsyslogd is not running after waiting for {} seconds and {} attempts",
attempts,
start.elapsed().as_secs()
))
}
// Restart rsyslogd to apply the new configuration.
// This is necessary, because there is no other way to reload the rsyslog configuration.
//
@@ -70,14 +36,14 @@ fn wait_for_rsyslog_pid() -> Result<String, anyhow::Error> {
// TODO: test it properly
//
fn restart_rsyslog() -> Result<()> {
let old_pid = get_rsyslog_pid().context("rsyslogd is not running")?;
info!("rsyslogd is running with pid: {}, restart it", old_pid);
// kill it to restart
let _ = Command::new("pkill")
.arg("rsyslogd")
.output()
.context("Failed to restart rsyslogd")?;
// ensure rsyslogd is running
wait_for_rsyslog_pid()?;
.context("Failed to stop rsyslogd")?;
Ok(())
}
@@ -165,11 +131,15 @@ pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result
return Ok(());
}
// Nothing to configure
// When new config is empty we can simply remove the configuration file.
if new_config.is_empty() {
// When the configuration is removed, PostgreSQL will stop sending data
// to the files watched by rsyslog, so restarting rsyslog is more effort
// than just ignoring this change.
info!("removing rsyslog config file: {}", POSTGRES_LOGS_CONF_PATH);
match std::fs::remove_file(POSTGRES_LOGS_CONF_PATH) {
Ok(_) => {}
Err(err) if err.kind() == ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
restart_rsyslog()?;
return Ok(());
}

View File

@@ -2,10 +2,8 @@
[pageserver]
listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
listen_grpc_addr = '127.0.0.1:51051'
pg_auth_type = 'Trust'
http_auth_type = 'Trust'
grpc_auth_type = 'Trust'
[[safekeepers]]
id = 1

View File

@@ -4,10 +4,8 @@
id=1
listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
listen_grpc_addr = '127.0.0.1:51051'
pg_auth_type = 'Trust'
http_auth_type = 'Trust'
grpc_auth_type = 'Trust'
[[safekeepers]]
id = 1

View File

@@ -32,7 +32,6 @@ use control_plane::storage_controller::{
};
use nix::fcntl::{Flock, FlockArg};
use pageserver_api::config::{
DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
};
@@ -1008,16 +1007,13 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
NeonLocalInitPageserverConf {
id: pageserver_id,
listen_pg_addr: format!("127.0.0.1:{pg_port}"),
listen_http_addr: format!("127.0.0.1:{http_port}"),
listen_https_addr: None,
listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
pg_auth_type: AuthType::Trust,
http_auth_type: AuthType::Trust,
grpc_auth_type: AuthType::Trust,
other: Default::default(),
// Typical developer machines use disks with slow fsync, and we don't care
// about data integrity: disable disk syncs.

View File

@@ -278,10 +278,8 @@ pub struct PageServerConf {
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub pg_auth_type: AuthType,
pub http_auth_type: AuthType,
pub grpc_auth_type: AuthType,
pub no_sync: bool,
}
@@ -292,10 +290,8 @@ impl Default for PageServerConf {
listen_pg_addr: String::new(),
listen_http_addr: String::new(),
listen_https_addr: None,
listen_grpc_addr: None,
pg_auth_type: AuthType::Trust,
http_auth_type: AuthType::Trust,
grpc_auth_type: AuthType::Trust,
no_sync: false,
}
}
@@ -310,10 +306,8 @@ pub struct NeonLocalInitPageserverConf {
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub pg_auth_type: AuthType,
pub http_auth_type: AuthType,
pub grpc_auth_type: AuthType,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub no_sync: bool,
#[serde(flatten)]
@@ -327,10 +321,8 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
listen_pg_addr,
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
pg_auth_type,
http_auth_type,
grpc_auth_type,
no_sync,
other: _,
} = conf;
@@ -339,9 +331,7 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
listen_pg_addr: listen_pg_addr.clone(),
listen_http_addr: listen_http_addr.clone(),
listen_https_addr: listen_https_addr.clone(),
listen_grpc_addr: listen_grpc_addr.clone(),
pg_auth_type: *pg_auth_type,
grpc_auth_type: *grpc_auth_type,
http_auth_type: *http_auth_type,
no_sync: *no_sync,
}
@@ -717,10 +707,8 @@ impl LocalEnv {
listen_pg_addr: String,
listen_http_addr: String,
listen_https_addr: Option<String>,
listen_grpc_addr: Option<String>,
pg_auth_type: AuthType,
http_auth_type: AuthType,
grpc_auth_type: AuthType,
#[serde(default)]
no_sync: bool,
}
@@ -744,10 +732,8 @@ impl LocalEnv {
listen_pg_addr,
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
pg_auth_type,
http_auth_type,
grpc_auth_type,
no_sync,
} = config_toml;
let IdentityTomlSubset {
@@ -764,10 +750,8 @@ impl LocalEnv {
listen_pg_addr,
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
pg_auth_type,
http_auth_type,
grpc_auth_type,
no_sync,
};
pageservers.push(conf);

View File

@@ -129,9 +129,7 @@ impl PageServerNode {
));
}
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
.contains(&AuthType::NeonJWT)
{
if conf.http_auth_type != AuthType::Trust || conf.pg_auth_type != AuthType::Trust {
// Keys are generated in the toplevel repo dir, pageservers' workdirs
// are one level below that, so refer to keys with ../
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
@@ -553,11 +551,6 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
basebackup_cache_enabled: settings
.remove("basebackup_cache_enabled")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'basebackup_cache_enabled' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -8,9 +8,6 @@ pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
pub const DEFAULT_GRPC_LISTEN_TLS: bool = false; // TODO: enable by default?
use std::collections::HashMap;
use std::num::{NonZeroU64, NonZeroUsize};
@@ -107,8 +104,6 @@ pub struct ConfigToml {
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_tls: bool,
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
#[serde(with = "humantime_serde")]
@@ -128,7 +123,6 @@ pub struct ConfigToml {
pub http_auth_type: AuthType,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub pg_auth_type: AuthType,
pub grpc_auth_type: AuthType,
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
pub remote_storage: Option<RemoteStorageConfig>,
pub tenant_config: TenantConfigToml,
@@ -189,8 +183,6 @@ pub struct ConfigToml {
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
pub timeline_import_config: TimelineImportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -316,26 +308,6 @@ pub struct TimelineImportConfig {
pub import_job_checkpoint_threshold: NonZeroUsize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(default)]
pub struct BasebackupCacheConfig {
#[serde(with = "humantime_serde")]
pub cleanup_period: Duration,
// FIXME: Support max_size_bytes.
// pub max_size_bytes: usize,
pub max_size_entries: i64,
}
impl Default for BasebackupCacheConfig {
fn default() -> Self {
Self {
cleanup_period: Duration::from_secs(60),
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
max_size_entries: 1000,
}
}
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -519,14 +491,8 @@ pub struct TenantConfigToml {
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
/// that will get perf sampling for the tenant.
pub sampling_ratio: Option<Ratio>,
/// Capacity of relsize snapshot cache (used by replicas).
pub relsize_snapshot_cache_capacity: usize,
/// Enable preparing basebackup on XLOG_CHECKPOINT_SHUTDOWN and using it in basebackup requests.
// FIXME: Remove skip_serializing_if when the feature is stable.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub basebackup_cache_enabled: bool,
}
pub mod defaults {
@@ -594,8 +560,6 @@ impl Default for ConfigToml {
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
listen_https_addr: (None),
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
listen_grpc_tls: DEFAULT_GRPC_LISTEN_TLS,
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),
@@ -612,7 +576,6 @@ impl Default for ConfigToml {
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
http_auth_type: (AuthType::Trust),
pg_auth_type: (AuthType::Trust),
grpc_auth_type: (AuthType::Trust),
auth_validation_public_key_path: (None),
remote_storage: None,
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT
@@ -703,7 +666,6 @@ impl Default for ConfigToml {
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
},
basebackup_cache_config: None,
}
}
}
@@ -829,7 +791,6 @@ impl Default for TenantConfigToml {
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
sampling_ratio: None,
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
basebackup_cache_enabled: false,
}
}
}

View File

@@ -632,8 +632,6 @@ pub struct TenantConfigPatch {
pub sampling_ratio: FieldPatch<Option<Ratio>>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub basebackup_cache_enabled: FieldPatch<bool>,
}
/// Like [`crate::config::TenantConfigToml`], but preserves the information
@@ -766,9 +764,6 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub relsize_snapshot_cache_capacity: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_enabled: Option<bool>,
}
impl TenantConfig {
@@ -815,7 +810,6 @@ impl TenantConfig {
mut gc_compaction_ratio_percent,
mut sampling_ratio,
mut relsize_snapshot_cache_capacity,
mut basebackup_cache_enabled,
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
@@ -920,9 +914,6 @@ impl TenantConfig {
patch
.relsize_snapshot_cache_capacity
.apply(&mut relsize_snapshot_cache_capacity);
patch
.basebackup_cache_enabled
.apply(&mut basebackup_cache_enabled);
Ok(Self {
checkpoint_distance,
@@ -963,7 +954,6 @@ impl TenantConfig {
gc_compaction_ratio_percent,
sampling_ratio,
relsize_snapshot_cache_capacity,
basebackup_cache_enabled,
})
}
@@ -1075,9 +1065,6 @@ impl TenantConfig {
relsize_snapshot_cache_capacity: self
.relsize_snapshot_cache_capacity
.unwrap_or(global_conf.relsize_snapshot_cache_capacity),
basebackup_cache_enabled: self
.basebackup_cache_enabled
.unwrap_or(global_conf.basebackup_cache_enabled),
}
}
}

View File

@@ -25,7 +25,6 @@ where
Ok(())
}
#[derive(Debug)]
pub enum BindError {
Conversion(Box<dyn Error + marker::Sync + Send>),
Serialization(io::Error),
@@ -289,12 +288,6 @@ pub fn sync(buf: &mut BytesMut) {
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
#[inline]
pub fn flush(buf: &mut BytesMut) {
buf.put_u8(b'H');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
#[inline]
pub fn terminate(buf: &mut BytesMut) {
buf.put_u8(b'X');

View File

@@ -9,6 +9,7 @@ use std::error::Error;
use std::fmt;
use std::sync::Arc;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
#[doc(inline)]
pub use postgres_protocol2::Oid;
@@ -26,6 +27,41 @@ macro_rules! accepts {
)
}
/// Generates an implementation of `ToSql::to_sql_checked`.
///
/// All `ToSql` implementations should use this macro.
macro_rules! to_sql_checked {
() => {
fn to_sql_checked(
&self,
ty: &$crate::Type,
out: &mut $crate::private::BytesMut,
) -> ::std::result::Result<
$crate::IsNull,
Box<dyn ::std::error::Error + ::std::marker::Sync + ::std::marker::Send>,
> {
$crate::__to_sql_checked(self, ty, out)
}
};
}
// WARNING: this function is not considered part of this crate's public API.
// It is subject to change at any time.
#[doc(hidden)]
pub fn __to_sql_checked<T>(
v: &T,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn Error + Sync + Send>>
where
T: ToSql,
{
if !T::accepts(ty) {
return Err(Box::new(WrongType::new::<T>(ty.clone())));
}
v.to_sql(ty, out)
}
// mod pg_lsn;
#[doc(hidden)]
pub mod private;
@@ -106,7 +142,7 @@ pub enum Kind {
/// An array type along with the type of its elements.
Array(Type),
/// A range type along with the type of its elements.
Range(Oid),
Range(Type),
/// A multirange type along with the type of its elements.
Multirange(Type),
/// A domain type along with its underlying type.
@@ -341,6 +377,43 @@ pub enum IsNull {
No,
}
/// A trait for types that can be converted into Postgres values.
pub trait ToSql: fmt::Debug {
/// Converts the value of `self` into the binary format of the specified
/// Postgres `Type`, appending it to `out`.
///
/// The caller of this method is responsible for ensuring that this type
/// is compatible with the Postgres `Type`.
///
/// The return value indicates if this value should be represented as
/// `NULL`. If this is the case, implementations **must not** write
/// anything to `out`.
fn to_sql(&self, ty: &Type, out: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>>
where
Self: Sized;
/// Determines if a value of this type can be converted to the specified
/// Postgres `Type`.
fn accepts(ty: &Type) -> bool
where
Self: Sized;
/// An adaptor method used internally by Rust-Postgres.
///
/// *All* implementations of this method should be generated by the
/// `to_sql_checked!()` macro.
fn to_sql_checked(
&self,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn Error + Sync + Send>>;
/// Specify the encode format
fn encode_format(&self, _ty: &Type) -> Format {
Format::Binary
}
}
/// Supported Postgres message format types
///
/// Using Text format in a message assumes a Postgres `SERVER_ENCODING` of `UTF8`
@@ -351,3 +424,52 @@ pub enum Format {
/// Compact, typed binary format
Binary,
}
impl ToSql for &str {
fn to_sql(&self, ty: &Type, w: &mut BytesMut) -> Result<IsNull, Box<dyn Error + Sync + Send>> {
match *ty {
ref ty if ty.name() == "ltree" => types::ltree_to_sql(self, w),
ref ty if ty.name() == "lquery" => types::lquery_to_sql(self, w),
ref ty if ty.name() == "ltxtquery" => types::ltxtquery_to_sql(self, w),
_ => types::text_to_sql(self, w),
}
Ok(IsNull::No)
}
fn accepts(ty: &Type) -> bool {
match *ty {
Type::VARCHAR | Type::TEXT | Type::BPCHAR | Type::NAME | Type::UNKNOWN => true,
ref ty
if (ty.name() == "citext"
|| ty.name() == "ltree"
|| ty.name() == "lquery"
|| ty.name() == "ltxtquery") =>
{
true
}
_ => false,
}
}
to_sql_checked!();
}
macro_rules! simple_to {
($t:ty, $f:ident, $($expected:ident),+) => {
impl ToSql for $t {
fn to_sql(&self,
_: &Type,
w: &mut BytesMut)
-> Result<IsNull, Box<dyn Error + Sync + Send>> {
types::$f(*self, w);
Ok(IsNull::No)
}
accepts!($($expected),+);
to_sql_checked!();
}
}
}
simple_to!(u32, oid_to_sql, OID);

View File

@@ -393,7 +393,7 @@ impl Inner {
}
}
pub const fn const_oid(&self) -> Oid {
pub fn oid(&self) -> Oid {
match *self {
Inner::Bool => 16,
Inner::Bytea => 17,
@@ -580,14 +580,7 @@ impl Inner {
Inner::TstzmultiRangeArray => 6153,
Inner::DatemultiRangeArray => 6155,
Inner::Int8multiRangeArray => 6157,
Inner::Other(_) => u32::MAX,
}
}
pub fn oid(&self) -> Oid {
match *self {
Inner::Other(ref u) => u.oid,
_ => self.const_oid(),
}
}
@@ -734,17 +727,17 @@ impl Inner {
Inner::JsonbArray => &Kind::Array(Type(Inner::Jsonb)),
Inner::AnyRange => &Kind::Pseudo,
Inner::EventTrigger => &Kind::Pseudo,
Inner::Int4Range => &const { Kind::Range(Inner::Int4.const_oid()) },
Inner::Int4Range => &Kind::Range(Type(Inner::Int4)),
Inner::Int4RangeArray => &Kind::Array(Type(Inner::Int4Range)),
Inner::NumRange => &const { Kind::Range(Inner::Numeric.const_oid()) },
Inner::NumRange => &Kind::Range(Type(Inner::Numeric)),
Inner::NumRangeArray => &Kind::Array(Type(Inner::NumRange)),
Inner::TsRange => &const { Kind::Range(Inner::Timestamp.const_oid()) },
Inner::TsRange => &Kind::Range(Type(Inner::Timestamp)),
Inner::TsRangeArray => &Kind::Array(Type(Inner::TsRange)),
Inner::TstzRange => &const { Kind::Range(Inner::Timestamptz.const_oid()) },
Inner::TstzRange => &Kind::Range(Type(Inner::Timestamptz)),
Inner::TstzRangeArray => &Kind::Array(Type(Inner::TstzRange)),
Inner::DateRange => &const { Kind::Range(Inner::Date.const_oid()) },
Inner::DateRange => &Kind::Range(Type(Inner::Date)),
Inner::DateRangeArray => &Kind::Array(Type(Inner::DateRange)),
Inner::Int8Range => &const { Kind::Range(Inner::Int8.const_oid()) },
Inner::Int8Range => &Kind::Range(Type(Inner::Int8)),
Inner::Int8RangeArray => &Kind::Array(Type(Inner::Int8Range)),
Inner::Jsonpath => &Kind::Simple,
Inner::JsonpathArray => &Kind::Array(Type(Inner::Jsonpath)),

View File

@@ -18,8 +18,8 @@ use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, Type};
use crate::{
CancelToken, Error, ReadyForQueryStatus, SimpleQueryMessage, Transaction, TransactionBuilder,
query, simple_query,
CancelToken, Error, ReadyForQueryStatus, SimpleQueryMessage, Statement, Transaction,
TransactionBuilder, query, simple_query,
};
pub struct Responses {
@@ -59,7 +59,7 @@ impl Responses {
return Poll::Ready(res);
}
// get the next batch of messages.
// get the next back of messages.
match ready!(self.receiver.poll_recv(cx)) {
Some(messages) => self.cur = messages,
None => return Poll::Ready(Err(Error::closed())),
@@ -76,6 +76,11 @@ impl Responses {
/// (corresponding to the queries in the [crate::prepare] module).
#[derive(Default)]
pub(crate) struct CachedTypeInfo {
/// A statement for basic information for a type from its
/// OID. Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_QUERY) (or its
/// fallback).
pub(crate) typeinfo: Option<Statement>,
/// Cache of types already looked up.
pub(crate) types: HashMap<Oid, Type>,
}
@@ -89,74 +94,21 @@ pub struct InnerClient {
}
impl InnerClient {
pub fn start(&mut self) -> Result<PartialQuery, Error> {
self.responses.waiting += 1;
Ok(PartialQuery(Some(self)))
}
// pub fn send_with_sync<F>(&mut self, f: F) -> Result<&mut Responses, Error>
// where
// F: FnOnce(&mut BytesMut) -> Result<(), Error>,
// {
// self.start()?.send_with_sync(f)
// }
pub fn send_simple_query(&mut self, query: &str) -> Result<&mut Responses, Error> {
self.responses.waiting += 1;
self.buffer.clear();
// simple queries do not need sync.
frontend::query(query, &mut self.buffer).map_err(Error::encode)?;
let buf = self.buffer.split().freeze();
self.send_message(FrontendMessage::Raw(buf))
}
fn send_message(&mut self, messages: FrontendMessage) -> Result<&mut Responses, Error> {
pub fn send(&mut self, messages: FrontendMessage) -> Result<&mut Responses, Error> {
self.sender.send(messages).map_err(|_| Error::closed())?;
self.responses.waiting += 1;
Ok(&mut self.responses)
}
}
pub struct PartialQuery<'a>(Option<&'a mut InnerClient>);
impl Drop for PartialQuery<'_> {
fn drop(&mut self) {
if let Some(client) = self.0.take() {
client.buffer.clear();
frontend::sync(&mut client.buffer);
let buf = client.buffer.split().freeze();
let _ = client.send_message(FrontendMessage::Raw(buf));
}
}
}
impl<'a> PartialQuery<'a> {
pub fn send_with_flush<F>(&mut self, f: F) -> Result<&mut Responses, Error>
/// Call the given function with a buffer to be used when writing out
/// postgres commands.
pub fn with_buf<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> Result<(), Error>,
F: FnOnce(&mut BytesMut) -> R,
{
let client = self.0.as_deref_mut().unwrap();
client.buffer.clear();
f(&mut client.buffer)?;
frontend::flush(&mut client.buffer);
let buf = client.buffer.split().freeze();
client.send_message(FrontendMessage::Raw(buf))
}
pub fn send_with_sync<F>(mut self, f: F) -> Result<&'a mut Responses, Error>
where
F: FnOnce(&mut BytesMut) -> Result<(), Error>,
{
let client = self.0.as_deref_mut().unwrap();
client.buffer.clear();
f(&mut client.buffer)?;
frontend::sync(&mut client.buffer);
let buf = client.buffer.split().freeze();
let _ = client.send_message(FrontendMessage::Raw(buf));
Ok(&mut self.0.take().unwrap().responses)
let r = f(&mut self.buffer);
self.buffer.clear();
r
}
}
@@ -182,6 +134,19 @@ pub struct Client {
secret_key: i32,
}
impl Drop for Client {
fn drop(&mut self) {
if let Some(stmt) = self.cached_typeinfo.typeinfo.take() {
let buf = self.inner.with_buf(|buf| {
frontend::close(b'S', stmt.name(), buf).unwrap();
frontend::sync(buf);
buf.split().freeze()
});
let _ = self.inner.send(FrontendMessage::Raw(buf));
}
}
}
impl Client {
pub(crate) fn new(
sender: mpsc::UnboundedSender<FrontendMessage>,
@@ -216,7 +181,7 @@ impl Client {
self.process_id
}
pub(crate) fn inner_mut(&mut self) -> &mut InnerClient {
pub(crate) fn inner(&mut self) -> &mut InnerClient {
&mut self.inner
}
@@ -232,13 +197,7 @@ impl Client {
I: IntoIterator<Item = Option<S>>,
I::IntoIter: ExactSizeIterator,
{
query::query_txt(
&mut self.inner,
&mut self.cached_typeinfo,
statement,
params,
)
.await
query::query_txt(&mut self.inner, statement, params).await
}
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
@@ -262,7 +221,7 @@ impl Client {
&mut self,
query: &str,
) -> Result<SimpleQueryStream, Error> {
simple_query::simple_query(self.inner_mut(), query).await
simple_query::simple_query(self.inner(), query).await
}
/// Executes a sequence of SQL statements using the simple query protocol.
@@ -276,10 +235,14 @@ impl Client {
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub async fn batch_execute(&mut self, query: &str) -> Result<ReadyForQueryStatus, Error> {
simple_query::batch_execute(self.inner_mut(), query).await
simple_query::batch_execute(self.inner(), query).await
}
pub async fn discard_all(&mut self) -> Result<ReadyForQueryStatus, Error> {
// clear the prepared statements that are about to be nuked from the postgres session
self.cached_typeinfo.typeinfo = None;
self.batch_execute("discard all").await
}
@@ -298,7 +261,11 @@ impl Client {
return;
}
let _ = self.client.inner.send_simple_query("ROLLBACK");
let buf = self.client.inner().with_buf(|buf| {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self.client.inner().send(FrontendMessage::Raw(buf));
}
}
@@ -338,6 +305,11 @@ impl Client {
}
}
/// Query for type information
pub(crate) async fn get_type_inner(&mut self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(&mut self.inner, &mut self.cached_typeinfo, oid).await
}
/// Determines if the connection to the server has already closed.
///
/// In that case, all future queries will fail.

View File

@@ -1,6 +1,9 @@
#![allow(async_fn_in_trait)]
use postgres_protocol2::Oid;
use crate::query::RowStream;
use crate::types::Type;
use crate::{Client, Error, Transaction};
mod private {
@@ -17,6 +20,9 @@ pub trait GenericClient: private::Sealed {
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,
I::IntoIter: ExactSizeIterator + Sync + Send;
/// Query for type information
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error>;
}
impl private::Sealed for Client {}
@@ -30,6 +36,11 @@ impl GenericClient for Client {
{
self.query_raw_txt(statement, params).await
}
/// Query for type information
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error> {
self.get_type_inner(oid).await
}
}
impl private::Sealed for Transaction<'_> {}
@@ -43,4 +54,9 @@ impl GenericClient for Transaction<'_> {
{
self.query_raw_txt(statement, params).await
}
/// Query for type information
async fn get_type(&mut self, oid: Oid) -> Result<Type, Error> {
self.client_mut().get_type(oid).await
}
}

View File

@@ -18,6 +18,7 @@ pub use crate::statement::{Column, Statement};
pub use crate::tls::NoTls;
pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;
/// After executing a query, the connection will be in one of these states
#[derive(Clone, Copy, Debug, PartialEq)]
@@ -119,3 +120,9 @@ pub enum SimpleQueryMessage {
/// The number of rows modified or selected is returned.
CommandComplete(u64),
}
fn slice_iter<'a>(
s: &'a [&'a (dyn ToSql + Sync)],
) -> impl ExactSizeIterator<Item = &'a (dyn ToSql + Sync)> + 'a {
s.iter().map(|s| *s as _)
}

View File

@@ -1,14 +1,17 @@
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use postgres_protocol2::IsNull;
use postgres_protocol2::message::backend::{Message, RowDescriptionBody};
use postgres_protocol2::message::frontend;
use postgres_protocol2::types::oid_to_sql;
use postgres_types2::Format;
use std::future::Future;
use std::pin::Pin;
use crate::client::{CachedTypeInfo, PartialQuery, Responses};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, pin_mut};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;
use crate::types::{Kind, Oid, Type};
use crate::{Column, Error, Row, Statement};
use crate::{Column, Error, Statement, query, slice_iter};
pub(crate) const TYPEINFO_QUERY: &str = "\
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
@@ -18,51 +21,22 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
WHERE t.oid = $1
";
/// we need to make sure we close this prepared statement.
struct CloseStmt<'a, 'b> {
client: Option<&'a mut PartialQuery<'b>>,
name: &'static str,
}
impl<'a> CloseStmt<'a, '_> {
fn close(mut self) -> Result<&'a mut Responses, Error> {
let client = self.client.take().unwrap();
client.send_with_flush(|buf| {
frontend::close(b'S', self.name, buf).map_err(Error::encode)?;
Ok(())
})
}
}
impl Drop for CloseStmt<'_, '_> {
fn drop(&mut self) {
if let Some(client) = self.client.take() {
let _ = client.send_with_flush(|buf| {
frontend::close(b'S', self.name, buf).map_err(Error::encode)?;
Ok(())
});
}
}
}
async fn prepare_typecheck(
client: &mut PartialQuery<'_>,
client: &mut InnerClient,
name: &'static str,
query: &str,
types: &[Type],
) -> Result<Statement, Error> {
let responses = client.send_with_flush(|buf| {
frontend::parse(name, query, [], buf).map_err(Error::encode)?;
frontend::describe(b'S', name, buf).map_err(Error::encode)?;
Ok(())
})?;
let buf = encode(client, name, query, types)?;
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::ParseComplete => {}
_ => return Err(Error::unexpected_message()),
}
match responses.next().await? {
Message::ParameterDescription(_) => {}
let parameter_description = match responses.next().await? {
Message::ParameterDescription(body) => body,
_ => return Err(Error::unexpected_message()),
};
@@ -72,6 +46,13 @@ async fn prepare_typecheck(
_ => return Err(Error::unexpected_message()),
};
let mut parameters = vec![];
let mut it = parameter_description.parameters();
while let Some(oid) = it.next().map_err(Error::parse)? {
let type_ = Type::from_oid(oid).ok_or_else(Error::unexpected_message)?;
parameters.push(type_);
}
let mut columns = vec![];
if let Some(row_description) = row_description {
let mut it = row_description.fields();
@@ -82,168 +63,103 @@ async fn prepare_typecheck(
}
}
Ok(Statement::new(name, columns))
Ok(Statement::new(name, parameters, columns))
}
fn try_from_cache(typecache: &CachedTypeInfo, oid: Oid) -> Option<Type> {
fn encode(
client: &mut InnerClient,
name: &str,
query: &str,
types: &[Type],
) -> Result<Bytes, Error> {
if types.is_empty() {
debug!("preparing query {}: {}", name, query);
} else {
debug!("preparing query {} with types {:?}: {}", name, types, query);
}
client.with_buf(|buf| {
frontend::parse(name, query, types.iter().map(Type::oid), buf).map_err(Error::encode)?;
frontend::describe(b'S', name, buf).map_err(Error::encode)?;
frontend::sync(buf);
Ok(buf.split().freeze())
})
}
pub async fn get_type(
client: &mut InnerClient,
typecache: &mut CachedTypeInfo,
oid: Oid,
) -> Result<Type, Error> {
if let Some(type_) = Type::from_oid(oid) {
return Some(type_);
return Ok(type_);
}
if let Some(type_) = typecache.types.get(&oid) {
return Some(type_.clone());
return Ok(type_.clone());
};
None
}
let stmt = typeinfo_statement(client, typecache).await?;
pub async fn parse_row_description(
client: &mut PartialQuery<'_>,
typecache: &mut CachedTypeInfo,
row_description: Option<RowDescriptionBody>,
) -> Result<Vec<Column>, Error> {
let mut columns = vec![];
let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
pin_mut!(rows);
if let Some(row_description) = row_description {
let mut it = row_description.fields();
while let Some(field) = it.next().map_err(Error::parse)? {
let type_ = try_from_cache(typecache, field.type_oid()).unwrap_or(Type::UNKNOWN);
let column = Column::new(field.name().to_string(), type_, field);
columns.push(column);
}
}
let all_known = columns.iter().all(|c| c.type_ != Type::UNKNOWN);
if all_known {
// all known, return early.
return Ok(columns);
}
let typeinfo = "neon_proxy_typeinfo";
// make sure to close the typeinfo statement before exiting.
let mut guard = CloseStmt {
name: typeinfo,
client: None,
};
let client = guard.client.insert(client);
// get the typeinfo statement.
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY).await?;
for column in &mut columns {
column.type_ = get_type(client, typecache, &stmt, column.type_oid()).await?;
}
// cancel the close guard.
let responses = guard.close()?;
match responses.next().await? {
Message::CloseComplete => {}
_ => return Err(Error::unexpected_message()),
}
Ok(columns)
}
async fn get_type(
client: &mut PartialQuery<'_>,
typecache: &mut CachedTypeInfo,
stmt: &Statement,
mut oid: Oid,
) -> Result<Type, Error> {
let mut stack = vec![];
let mut type_ = loop {
if let Some(type_) = try_from_cache(typecache, oid) {
break type_;
}
let row = exec(client, stmt, oid).await?;
if stack.len() > 8 {
return Err(Error::unexpected_message());
}
let name: String = row.try_get(0)?;
let type_: i8 = row.try_get(1)?;
let elem_oid: Oid = row.try_get(2)?;
let rngsubtype: Option<Oid> = row.try_get(3)?;
let basetype: Oid = row.try_get(4)?;
let schema: String = row.try_get(5)?;
let relid: Oid = row.try_get(6)?;
let kind = if type_ == b'e' as i8 {
Kind::Enum
} else if type_ == b'p' as i8 {
Kind::Pseudo
} else if basetype != 0 {
Kind::Domain(basetype)
} else if elem_oid != 0 {
stack.push((name, oid, schema));
oid = elem_oid;
continue;
} else if relid != 0 {
Kind::Composite(relid)
} else if let Some(rngsubtype) = rngsubtype {
Kind::Range(rngsubtype)
} else {
Kind::Simple
};
let type_ = Type::new(name, oid, kind, schema);
typecache.types.insert(oid, type_.clone());
break type_;
let row = match rows.try_next().await? {
Some(row) => row,
None => return Err(Error::unexpected_message()),
};
while let Some((name, oid, schema)) = stack.pop() {
type_ = Type::new(name, oid, Kind::Array(type_), schema);
typecache.types.insert(oid, type_.clone());
}
let name: String = row.try_get(0)?;
let type_: i8 = row.try_get(1)?;
let elem_oid: Oid = row.try_get(2)?;
let rngsubtype: Option<Oid> = row.try_get(3)?;
let basetype: Oid = row.try_get(4)?;
let schema: String = row.try_get(5)?;
let relid: Oid = row.try_get(6)?;
let kind = if type_ == b'e' as i8 {
Kind::Enum
} else if type_ == b'p' as i8 {
Kind::Pseudo
} else if basetype != 0 {
Kind::Domain(basetype)
} else if elem_oid != 0 {
let type_ = get_type_rec(client, typecache, elem_oid).await?;
Kind::Array(type_)
} else if relid != 0 {
Kind::Composite(relid)
} else if let Some(rngsubtype) = rngsubtype {
let type_ = get_type_rec(client, typecache, rngsubtype).await?;
Kind::Range(type_)
} else {
Kind::Simple
};
let type_ = Type::new(name, oid, kind, schema);
typecache.types.insert(oid, type_.clone());
Ok(type_)
}
/// exec the typeinfo statement returning one row.
async fn exec(
client: &mut PartialQuery<'_>,
statement: &Statement,
param: Oid,
) -> Result<Row, Error> {
let responses = client.send_with_flush(|buf| {
encode_bind(statement, param, "", buf);
frontend::execute("", 0, buf).map_err(Error::encode)?;
Ok(())
})?;
fn get_type_rec<'a>(
client: &'a mut InnerClient,
typecache: &'a mut CachedTypeInfo,
oid: Oid,
) -> Pin<Box<dyn Future<Output = Result<Type, Error>> + Send + 'a>> {
Box::pin(get_type(client, typecache, oid))
}
match responses.next().await? {
Message::BindComplete => {}
_ => return Err(Error::unexpected_message()),
async fn typeinfo_statement(
client: &mut InnerClient,
typecache: &mut CachedTypeInfo,
) -> Result<Statement, Error> {
if let Some(stmt) = &typecache.typeinfo {
return Ok(stmt.clone());
}
let row = match responses.next().await? {
Message::DataRow(body) => Row::new(statement.clone(), body, Format::Binary)?,
_ => return Err(Error::unexpected_message()),
};
let typeinfo = "neon_proxy_typeinfo";
let stmt = prepare_typecheck(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
match responses.next().await? {
Message::CommandComplete(_) => {}
_ => return Err(Error::unexpected_message()),
};
Ok(row)
}
fn encode_bind(statement: &Statement, param: Oid, portal: &str, buf: &mut BytesMut) {
frontend::bind(
portal,
statement.name(),
[Format::Binary as i16],
[param],
|param, buf| {
oid_to_sql(param, buf);
Ok(IsNull::No)
},
[Format::Binary as i16],
buf,
)
.unwrap();
typecache.typeinfo = Some(stmt.clone());
Ok(stmt)
}

View File

@@ -1,18 +1,60 @@
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BufMut;
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::Format;
use postgres_types2::{Format, ToSql, Type};
use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient, Responses};
use crate::{Error, ReadyForQueryStatus, Row, Statement};
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::types::IsNull;
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
struct BorrowToSqlParamsDebug<'a>(&'a [&'a (dyn ToSql + Sync)]);
impl fmt::Debug for BorrowToSqlParamsDebug<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.0.iter()).finish()
}
}
pub async fn query<'a, I>(
client: &mut InnerClient,
statement: Statement,
params: I,
) -> Result<RowStream, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let buf = if tracing::enabled!(tracing::Level::DEBUG) {
let params = params.into_iter().collect::<Vec<_>>();
debug!(
"executing statement {} with parameters: {:?}",
statement.name(),
BorrowToSqlParamsDebug(params.as_slice()),
);
encode(client, &statement, params)?
} else {
encode(client, &statement, params)?
};
let responses = start(client, buf).await?;
Ok(RowStream {
responses,
statement,
command_tag: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Binary,
})
}
pub async fn query_txt<'a, S, I>(
client: &'a mut InnerClient,
typecache: &mut CachedTypeInfo,
query: &str,
params: I,
) -> Result<RowStream<'a>, Error>
@@ -22,22 +64,8 @@ where
I::IntoIter: ExactSizeIterator,
{
let params = params.into_iter();
let mut client = client.start()?;
// Flow:
// 1. Parse the query
// 2. Inspect the row description for OIDs
// 3. If there's any OIDs we don't already know about, perform the typeinfo routine
// 4. Execute the query
// 5. Sync.
//
// The typeinfo routine:
// 1. Parse the typeinfo query
// 2. Execute the query on each OID
// 3. If the result does not match an OID we know, repeat 2.
// parse the query and get type info
let responses = client.send_with_flush(|buf| {
let buf = client.with_buf(|buf| {
frontend::parse(
"", // unnamed prepared statement
query, // query to parse
@@ -46,30 +74,7 @@ where
)
.map_err(Error::encode)?;
frontend::describe(b'S', "", buf).map_err(Error::encode)?;
Ok(())
})?;
match responses.next().await? {
Message::ParseComplete => {}
_ => return Err(Error::unexpected_message()),
}
match responses.next().await? {
Message::ParameterDescription(_) => {}
_ => return Err(Error::unexpected_message()),
};
let row_description = match responses.next().await? {
Message::RowDescription(body) => Some(body),
Message::NoData => None,
_ => return Err(Error::unexpected_message()),
};
let columns =
crate::prepare::parse_row_description(&mut client, typecache, row_description).await?;
let responses = client.send_with_sync(|buf| {
// Bind, pass params as text, retrieve as text
// Bind, pass params as text, retrieve as binary
match frontend::bind(
"", // empty string selects the unnamed portal
"", // unnamed prepared statement
@@ -92,24 +97,141 @@ where
// Execute
frontend::execute("", 0, buf).map_err(Error::encode)?;
// Sync
frontend::sync(buf);
Ok(())
Ok(buf.split().freeze())
})?;
// now read the responses
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::ParseComplete => {}
_ => return Err(Error::unexpected_message()),
}
let parameter_description = match responses.next().await? {
Message::ParameterDescription(body) => body,
_ => return Err(Error::unexpected_message()),
};
let row_description = match responses.next().await? {
Message::RowDescription(body) => Some(body),
Message::NoData => None,
_ => return Err(Error::unexpected_message()),
};
match responses.next().await? {
Message::BindComplete => {}
_ => return Err(Error::unexpected_message()),
}
let mut parameters = vec![];
let mut it = parameter_description.parameters();
while let Some(oid) = it.next().map_err(Error::parse)? {
let type_ = Type::from_oid(oid).unwrap_or(Type::UNKNOWN);
parameters.push(type_);
}
let mut columns = vec![];
if let Some(row_description) = row_description {
let mut it = row_description.fields();
while let Some(field) = it.next().map_err(Error::parse)? {
let type_ = Type::from_oid(field.type_oid()).unwrap_or(Type::UNKNOWN);
let column = Column::new(field.name().to_string(), type_, field);
columns.push(column);
}
}
Ok(RowStream {
responses,
statement: Statement::new("", columns),
statement: Statement::new_anonymous(parameters, columns),
command_tag: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Text,
})
}
async fn start(client: &mut InnerClient, buf: Bytes) -> Result<&mut Responses, Error> {
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::BindComplete => {}
_ => return Err(Error::unexpected_message()),
}
Ok(responses)
}
pub fn encode<'a, I>(
client: &mut InnerClient,
statement: &Statement,
params: I,
) -> Result<Bytes, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
client.with_buf(|buf| {
encode_bind(statement, params, "", buf)?;
frontend::execute("", 0, buf).map_err(Error::encode)?;
frontend::sync(buf);
Ok(buf.split().freeze())
})
}
pub fn encode_bind<'a, I>(
statement: &Statement,
params: I,
portal: &str,
buf: &mut BytesMut,
) -> Result<(), Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let param_types = statement.params();
let params = params.into_iter();
assert!(
param_types.len() == params.len(),
"expected {} parameters but got {}",
param_types.len(),
params.len()
);
let (param_formats, params): (Vec<_>, Vec<_>) = params
.zip(param_types.iter())
.map(|(p, ty)| (p.encode_format(ty) as i16, p))
.unzip();
let params = params.into_iter();
let mut error_idx = 0;
let r = frontend::bind(
portal,
statement.name(),
param_formats,
params.zip(param_types).enumerate(),
|(idx, (param, ty)), buf| match param.to_sql_checked(ty, buf) {
Ok(IsNull::No) => Ok(postgres_protocol2::IsNull::No),
Ok(IsNull::Yes) => Ok(postgres_protocol2::IsNull::Yes),
Err(e) => {
error_idx = idx;
Err(e)
}
},
Some(1),
buf,
);
match r {
Ok(()) => Ok(()),
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, error_idx)),
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
}
}
/// A stream of table rows.
pub struct RowStream<'a> {
responses: &'a mut Responses,

View File

@@ -2,13 +2,16 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
/// Information about a column of a single query row.
@@ -34,7 +37,8 @@ pub async fn simple_query<'a>(
) -> Result<SimpleQueryStream<'a>, Error> {
debug!("executing simple query: {}", query);
let responses = client.send_simple_query(query)?;
let buf = encode(client, query)?;
let responses = client.send(FrontendMessage::Raw(buf))?;
Ok(SimpleQueryStream {
responses,
@@ -49,7 +53,8 @@ pub async fn batch_execute(
) -> Result<ReadyForQueryStatus, Error> {
debug!("executing statement batch: {}", query);
let responses = client.send_simple_query(query)?;
let buf = encode(client, query)?;
let responses = client.send(FrontendMessage::Raw(buf))?;
loop {
match responses.next().await? {
@@ -63,6 +68,13 @@ pub async fn batch_execute(
}
}
pub(crate) fn encode(client: &mut InnerClient, query: &str) -> Result<Bytes, Error> {
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
Ok(buf.split().freeze())
})
}
pin_project! {
/// A stream of simple query results.
pub struct SimpleQueryStream<'a> {

View File

@@ -7,6 +7,7 @@ use postgres_protocol2::message::backend::Field;
struct StatementInner {
name: &'static str,
params: Vec<Type>,
columns: Vec<Column>,
}
@@ -17,14 +18,31 @@ struct StatementInner {
pub struct Statement(Arc<StatementInner>);
impl Statement {
pub(crate) fn new(name: &'static str, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner { name, columns }))
pub(crate) fn new(name: &'static str, params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
name,
params,
columns,
}))
}
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
name: "<anonymous>",
params,
columns,
}))
}
pub(crate) fn name(&self) -> &str {
self.0.name
}
/// Returns the expected types of the statement's parameters.
pub fn params(&self) -> &[Type] {
&self.0.params
}
/// Returns information about the columns returned when the statement is queried.
pub fn columns(&self) -> &[Column] {
&self.0.columns
@@ -34,7 +52,7 @@ impl Statement {
/// Information about a column of a query.
pub struct Column {
name: String,
pub(crate) type_: Type,
type_: Type,
// raw fields from RowDescription
table_oid: Oid,

View File

@@ -1,3 +1,6 @@
use postgres_protocol2::message::frontend;
use crate::codec::FrontendMessage;
use crate::query::RowStream;
use crate::{CancelToken, Client, Error, ReadyForQueryStatus};
@@ -16,7 +19,11 @@ impl Drop for Transaction<'_> {
return;
}
let _ = self.client.inner_mut().send_simple_query("ROLLBACK");
let buf = self.client.inner().with_buf(|buf| {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self.client.inner().send(FrontendMessage::Raw(buf));
}
}

View File

@@ -43,7 +43,6 @@ nix.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
once_cell.workspace = true
pageserver_page_api.workspace = true
pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
@@ -72,8 +71,6 @@ tokio-rustls.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tonic.workspace = true
tonic-reflection.workspace = true
tracing.workspace = true
tracing-utils.workspace = true
url.workspace = true

View File

@@ -1,518 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use async_compression::tokio::write::GzipEncoder;
use camino::{Utf8Path, Utf8PathBuf};
use metrics::core::{AtomicU64, GenericCounter};
use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
use tokio::{
io::{AsyncWriteExt, BufWriter},
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
shard::TenantShardId,
};
use crate::{
basebackup::send_basebackup_tarball,
context::{DownloadBehavior, RequestContext},
metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
task_mgr::TaskKind,
tenant::{
Timeline,
mgr::{TenantManager, TenantSlot},
},
};
pub struct BasebackupPrepareRequest {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub lsn: Lsn,
}
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
/// BasebackupCache stores cached basebackup archives for timelines on local disk.
///
/// The main purpose of this cache is to speed up the startup process of compute nodes
/// after scaling to zero.
/// Thus, the basebackup is stored only for the latest LSN of the timeline and with
/// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
///
/// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
/// generates a basebackup from the timeline in the background, and stores it on disk.
///
/// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
/// and ~1 RPS for get requests.
pub struct BasebackupCache {
data_dir: Utf8PathBuf,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
remove_entry_sender: BasebackupRemoveEntrySender,
entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
cancel: CancellationToken,
read_hit_count: GenericCounter<AtomicU64>,
read_miss_count: GenericCounter<AtomicU64>,
read_err_count: GenericCounter<AtomicU64>,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BasebackupCache {
/// Creates a BasebackupCache and spawns the background task.
/// The initialization of the cache is performed in the background and does not
/// block the caller. The cache will return `None` for any get requests until
/// initialization is complete.
pub fn spawn(
runtime_handle: &tokio::runtime::Handle,
data_dir: Utf8PathBuf,
config: Option<BasebackupCacheConfig>,
prepare_receiver: BasebackupPrepareReceiver,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) -> Arc<Self> {
let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
let enabled = config.is_some();
let cache = Arc::new(BasebackupCache {
data_dir,
config: config.unwrap_or_default(),
tenant_manager,
remove_entry_sender,
entries: std::sync::Mutex::new(HashMap::new()),
cancel,
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
});
if enabled {
runtime_handle.spawn(
cache
.clone()
.background(prepare_receiver, remove_entry_receiver),
);
}
cache
}
/// Gets a basebackup entry from the cache.
/// If the entry is found, opens a file with the basebackup archive and returns it.
/// The open file descriptor will prevent the file system from deleting the file
/// even if the entry is removed from the cache in the background.
pub async fn get(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Option<tokio::fs::File> {
// Fast path. Check if the entry exists using the in-memory state.
let tti = TenantTimelineId::new(tenant_id, timeline_id);
if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
self.read_miss_count.inc();
return None;
}
let path = self.entry_path(tenant_id, timeline_id, lsn);
match tokio::fs::File::open(path).await {
Ok(file) => {
self.read_hit_count.inc();
Some(file)
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// We may end up here if the basebackup was concurrently removed by the cleanup task.
self.read_miss_count.inc();
} else {
self.read_err_count.inc();
tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
}
None
}
}
}
// Private methods.
fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
// The default format for LSN is 0/ABCDEF.
// The backslash is not filename friendly, so serialize it as plain hex.
let lsn = lsn.0;
format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
}
fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
self.data_dir
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn entry_tmp_path(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Utf8PathBuf {
self.data_dir
.join("tmp")
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
let parts: Vec<&str> = filename
.strip_prefix("basebackup_")?
.strip_suffix(".tar.gz")?
.split('_')
.collect();
if parts.len() != 3 {
return None;
}
let tenant_id = parts[0].parse::<TenantId>().ok()?;
let timeline_id = parts[1].parse::<TimelineId>().ok()?;
let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
Some((tenant_id, timeline_id, lsn))
}
async fn cleanup(&self) -> anyhow::Result<()> {
// Cleanup tmp directory.
let tmp_dir = self.data_dir.join("tmp");
let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
while let Some(dir_entry) = tmp_dir.next_entry().await? {
if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
}
}
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
let mut entries_new = HashMap::new();
for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
if !tenant_shard_id.is_shard_zero() {
continue;
}
let TenantSlot::Attached(tenant) = tenant_slot else {
continue;
};
let tenant_id = tenant_shard_id.tenant_id;
for timeline in tenant.list_timelines() {
let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
if let Some(&entry_lsn) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry_lsn {
entries_new.insert(tti, entry_lsn);
}
}
}
}
for (&tti, &lsn) in entries_old.iter() {
if !entries_new.contains_key(&tti) {
self.remove_entry_sender
.send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
.unwrap();
}
}
BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
*self.entries.lock().unwrap() = entries_new;
Ok(())
}
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir and tmp directory if they do not exist.
tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create basebackup cache data_dir {:?}: {:?}",
self.data_dir,
e
)
})?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::new();
let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
while let Some(dir_entry) = dir.next_entry().await? {
let filename = dir_entry.file_name();
if filename == "tmp" {
// Skip the tmp directory.
continue;
}
let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
let Some((tenant_id, timeline_id, lsn)) = parsed else {
tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
continue;
};
let tti = TenantTimelineId::new(tenant_id, timeline_id);
use std::collections::hash_map::Entry::*;
match entries.entry(tti) {
Occupied(mut entry) => {
let entry_lsn = *entry.get();
// Leave only the latest entry, remove the old one.
if lsn < entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
lsn,
))?;
} else if lsn > entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
entry_lsn,
))?;
entry.insert(lsn);
} else {
// Two different filenames parsed to the same timline_id and LSN.
// Should never happen.
return Err(anyhow::anyhow!(
"Duplicate basebackup cache entry with the same LSN: {:?}",
filename
));
}
}
Vacant(entry) => {
entry.insert(lsn);
}
}
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
*self.entries.lock().unwrap() = entries;
Ok(())
}
async fn background(
self: Arc<Self>,
mut prepare_receiver: BasebackupPrepareReceiver,
mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
) {
// Panic in the background is a safe fallback.
// It will drop receivers and the cache will be effectively disabled.
self.on_startup()
.await
.expect("Failed to initialize basebackup cache");
let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
Some(req) = prepare_receiver.recv() => {
if let Err(err) = self.prepare_basebackup(
req.tenant_shard_id,
req.timeline_id,
req.lsn,
).await {
tracing::info!("Failed to prepare basebackup: {:#}", err);
self.prepare_err_count.inc();
continue;
}
}
Some(req) = remove_entry_receiver.recv() => {
if let Err(e) = tokio::fs::remove_file(req).await {
tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
}
}
_ = cleanup_ticker.tick() => {
self.cleanup().await.unwrap_or_else(|e| {
tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
});
}
_ = self.cancel.cancelled() => {
tracing::info!("BasebackupCache background task cancelled");
break;
}
}
}
}
/// Prepare a basebackup for the given timeline.
///
/// If the basebackup already exists with a higher LSN or the timeline already
/// has a higher last_record_lsn, skip the preparation.
///
/// The basebackup is prepared in a temporary directory and then moved to the final
/// location to make the operation atomic.
async fn prepare_basebackup(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
req_lsn: Lsn,
) -> anyhow::Result<()> {
tracing::info!(
tenant_id = %tenant_shard_id.tenant_id,
%timeline_id,
%req_lsn,
"Preparing basebackup for timeline",
);
let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
{
let entries = self.entries.lock().unwrap();
if let Some(&entry_lsn) = entries.get(&tti) {
if entry_lsn >= req_lsn {
tracing::info!(
%timeline_id,
%req_lsn,
%entry_lsn,
"Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
if entries.len() as i64 >= self.config.max_size_entries {
tracing::info!(
%timeline_id,
%req_lsn,
"Basebackup cache is full, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
let tenant = self
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let tenant_state = tenant.current_state();
if tenant_state != TenantState::Active {
anyhow::bail!(
"Tenant {} is not active, current state: {:?}",
tenant_shard_id.tenant_id,
tenant_state
)
}
let timeline = tenant.get_timeline(timeline_id, true)?;
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn > req_lsn {
tracing::info!(
%timeline_id,
%req_lsn,
%last_record_lsn,
"Timeline has a higher LSN than the requested one, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
let res = self
.prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
.await;
if let Err(err) = res {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
}
}
return Err(err);
}
// Move the tmp file to the final location atomically.
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
let mut entries = self.entries.lock().unwrap();
if let Some(old_lsn) = entries.insert(tti, req_lsn) {
// Remove the old entry if it exists.
self.remove_entry_sender
.send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
.unwrap();
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
self.prepare_ok_count.inc();
Ok(())
}
/// Prepares a basebackup in a temporary file.
async fn prepare_basebackup_tmp(
&self,
emptry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<()> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
let file = tokio::fs::File::create(emptry_tmp_path).await?;
let mut writer = BufWriter::new(file);
let mut encoder = GzipEncoder::with_quality(
&mut writer,
// Level::Best because compression is not on the hot path of basebackup requests.
// The decompression is almost not affected by the compression level.
async_compression::Level::Best,
);
// We may receive a request before the WAL record is applied to the timeline.
// Wait for the requested LSN to be applied.
timeline
.wait_lsn(
req_lsn,
crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
crate::tenant::timeline::WaitLsnTimeout::Default,
&ctx,
)
.await?;
send_basebackup_tarball(
&mut encoder,
timeline,
Some(req_lsn),
None,
false,
false,
&ctx,
)
.await?;
encoder.shutdown().await?;
writer.flush().await?;
writer.into_inner().sync_all().await?;
Ok(())
}
}

View File

@@ -16,7 +16,6 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::launch_timestamp::{LaunchTimestamp, set_launch_timestamp_metric};
use metrics::set_build_info_metric;
use nix::sys::socket::{setsockopt, sockopt};
use pageserver::basebackup_cache::BasebackupCache;
use pageserver::config::{PageServerConf, PageserverIdentity, ignored_fields};
use pageserver::controller_upcall_client::StorageControllerUpcallClient;
use pageserver::deletion_queue::DeletionQueue;
@@ -388,30 +387,23 @@ fn start_pageserver(
// We need to release the lock file only when the process exits.
std::mem::forget(lock_file);
// Bind the HTTP, libpq, and gRPC ports early, to error out if they are
// already in use.
info!(
"Starting pageserver http handler on {} with auth {:#?}",
conf.listen_http_addr, conf.http_auth_type
);
let http_listener = tcp_listener::bind(&conf.listen_http_addr)?;
// Bind the HTTP and libpq ports early, so that if they are in use by some other
// process, we error out early.
let http_addr = &conf.listen_http_addr;
info!("Starting pageserver http handler on {http_addr}");
let http_listener = tcp_listener::bind(http_addr)?;
let https_listener = match conf.listen_https_addr.as_ref() {
Some(https_addr) => {
info!(
"Starting pageserver https handler on {https_addr} with auth {:#?}",
conf.http_auth_type
);
info!("Starting pageserver https handler on {https_addr}");
Some(tcp_listener::bind(https_addr)?)
}
None => None,
};
info!(
"Starting pageserver pg protocol handler on {} with auth {:#?}",
conf.listen_pg_addr, conf.pg_auth_type,
);
let pageserver_listener = tcp_listener::bind(&conf.listen_pg_addr)?;
let pg_addr = &conf.listen_pg_addr;
info!("Starting pageserver pg protocol handler on {pg_addr}");
let pageserver_listener = tcp_listener::bind(pg_addr)?;
// Enable SO_KEEPALIVE on the socket, to detect dead connections faster.
// These are configured via net.ipv4.tcp_keepalive_* sysctls.
@@ -420,15 +412,6 @@ fn start_pageserver(
// support enabling keepalives while using the default OS sysctls.
setsockopt(&pageserver_listener, sockopt::KeepAlive, &true)?;
let mut grpc_listener = None;
if let Some(grpc_addr) = &conf.listen_grpc_addr {
info!(
"Starting pageserver gRPC handler on {grpc_addr} with auth {:#?}",
conf.grpc_auth_type
);
grpc_listener = Some(tcp_listener::bind(grpc_addr).map_err(|e| anyhow!("{e}"))?);
}
// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
@@ -456,8 +439,7 @@ fn start_pageserver(
// Initialize authentication for incoming connections
let http_auth;
let pg_auth;
let grpc_auth;
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::NeonJWT) {
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
// unwrap is ok because check is performed when creating config, so path is set and exists
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
@@ -465,27 +447,22 @@ fn start_pageserver(
let jwt_auth = JwtAuth::from_key_path(key_path)?;
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
http_auth = match conf.http_auth_type {
http_auth = match &conf.http_auth_type {
AuthType::Trust => None,
AuthType::NeonJWT => Some(auth.clone()),
};
pg_auth = match conf.pg_auth_type {
AuthType::Trust => None,
AuthType::NeonJWT => Some(auth.clone()),
};
grpc_auth = match conf.grpc_auth_type {
pg_auth = match &conf.pg_auth_type {
AuthType::Trust => None,
AuthType::NeonJWT => Some(auth),
};
} else {
http_auth = None;
pg_auth = None;
grpc_auth = None;
}
info!("Using auth for http API: {:#?}", conf.http_auth_type);
info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
let tls_server_config = if conf.listen_https_addr.is_some()
|| conf.enable_tls_page_service_api
|| conf.listen_grpc_tls
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
{
let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new(
"main",
@@ -564,8 +541,6 @@ fn start_pageserver(
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());
// Scan the local 'tenants/' directory and start loading the tenants
let (basebackup_prepare_sender, basebackup_prepare_receiver) =
tokio::sync::mpsc::unbounded_channel();
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
@@ -576,22 +551,12 @@ fn start_pageserver(
remote_storage: remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
},
order,
shutdown_pageserver.clone(),
))?;
let tenant_manager = Arc::new(tenant_manager);
let basebackup_cache = BasebackupCache::spawn(
BACKGROUND_RUNTIME.handle(),
conf.basebackup_cache_dir(),
conf.basebackup_cache_config.clone(),
basebackup_prepare_receiver,
Arc::clone(&tenant_manager),
shutdown_pageserver.child_token(),
);
BACKGROUND_RUNTIME.spawn({
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
@@ -793,33 +758,13 @@ fn start_pageserver(
tokio::net::TcpListener::from_std(pageserver_listener)
.context("create tokio listener")?
},
conf.enable_tls_page_service_api
.then(|| tls_server_config.clone())
.flatten(),
basebackup_cache.clone(),
if conf.enable_tls_page_service_api {
tls_server_config
} else {
None
},
);
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
// each stream/request.
//
// TODO: this uses a separate Tokio runtime for the page service. If we want
// other gRPC services, they will need their own port and runtime. Is this
// necessary?
let mut page_service_grpc = None;
if let Some(grpc_listener) = grpc_listener {
page_service_grpc = Some(page_service::spawn_grpc(
conf,
tenant_manager.clone(),
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),
grpc_listener,
conf.listen_grpc_tls
.then(|| tls_server_config.clone())
.flatten(),
basebackup_cache,
)?);
}
// All started up! Now just sit and wait for shutdown signal.
BACKGROUND_RUNTIME.block_on(async move {
let signal_token = CancellationToken::new();
@@ -838,7 +783,6 @@ fn start_pageserver(
http_endpoint_listener,
https_endpoint_listener,
page_service,
page_service_grpc,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,

View File

@@ -58,18 +58,11 @@ pub struct PageServerConf {
pub listen_http_addr: String,
/// Example: 127.0.0.1:9899
pub listen_https_addr: Option<String>,
/// If set, expose a gRPC API on this address.
/// Example: 127.0.0.1:51051
///
/// EXPERIMENTAL: this protocol is unstable and under active development.
pub listen_grpc_addr: Option<String>,
/// If true, enable TLS for the gRPC server, using ssl_key_file and ssl_cert_file.
pub listen_grpc_tls: bool,
/// Path to a file with certificate's private key for https and gRPC API.
/// Path to a file with certificate's private key for https API.
/// Default: server.key
pub ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https and gRPC API.
/// Path to a file with a X509 certificate for https API.
/// Default: server.crt
pub ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
@@ -107,8 +100,6 @@ pub struct PageServerConf {
pub http_auth_type: AuthType,
/// authentication method for libpq connections from compute
pub pg_auth_type: AuthType,
/// authentication method for gRPC connections from compute
pub grpc_auth_type: AuthType,
/// Path to a file or directory containing public key(s) for verifying JWT tokens.
/// Used for both mgmt and compute auth, if enabled.
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
@@ -230,7 +221,7 @@ pub struct PageServerConf {
pub tracing: Option<pageserver_api::config::Tracing>,
/// Enable TLS in the libpq page service API.
/// Enable TLS in page service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
pub enable_tls_page_service_api: bool,
@@ -241,8 +232,6 @@ pub struct PageServerConf {
pub dev_mode: bool,
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
}
/// Token for authentication to safekeepers
@@ -272,10 +261,6 @@ impl PageServerConf {
self.workdir.join("metadata.json")
}
pub fn basebackup_cache_dir(&self) -> Utf8PathBuf {
self.workdir.join("basebackup_cache")
}
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
@@ -364,8 +349,6 @@ impl PageServerConf {
listen_pg_addr,
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
listen_grpc_tls,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
@@ -380,7 +363,6 @@ impl PageServerConf {
pg_distrib_dir,
http_auth_type,
pg_auth_type,
grpc_auth_type,
auth_validation_public_key_path,
remote_storage,
broker_endpoint,
@@ -425,7 +407,6 @@ impl PageServerConf {
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
basebackup_cache_config,
} = config_toml;
let mut conf = PageServerConf {
@@ -435,8 +416,6 @@ impl PageServerConf {
listen_pg_addr,
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
listen_grpc_tls,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
@@ -449,7 +428,6 @@ impl PageServerConf {
max_file_descriptors,
http_auth_type,
pg_auth_type,
grpc_auth_type,
auth_validation_public_key_path,
remote_storage_config: remote_storage,
broker_endpoint,
@@ -483,7 +461,6 @@ impl PageServerConf {
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
basebackup_cache_config,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -546,9 +523,7 @@ impl PageServerConf {
// custom validation code that covers more than one field in isolation
// ------------------------------------------------------------
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
.contains(&AuthType::NeonJWT)
{
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
let auth_validation_public_key_path = conf
.auth_validation_public_key_path
.get_or_insert_with(|| workdir.join("auth_public_key.pem"));

View File

@@ -18,25 +18,12 @@ use crate::tenant::timeline::logical_size::CurrentLogicalSize;
// management.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(super) enum Name {
/// Timeline last_record_lsn, absolute.
/// Timeline last_record_lsn, absolute
#[serde(rename = "written_size")]
WrittenSize,
/// Timeline last_record_lsn, incremental
#[serde(rename = "written_data_bytes_delta")]
WrittenSizeDelta,
/// Written bytes only on this timeline (not including ancestors):
/// written_size - ancestor_lsn
///
/// On the root branch, this is equivalent to `written_size`.
#[serde(rename = "written_size_since_parent")]
WrittenSizeSinceParent,
/// PITR history size only on this timeline (not including ancestors):
/// last_record_lsn - max(pitr_cutoff, ancestor_lsn).
///
/// On the root branch, this is its entire PITR history size. Not emitted if GC hasn't computed
/// the PITR cutoff yet. 0 if PITR is disabled.
#[serde(rename = "pitr_history_size_since_parent")]
PitrHistorySizeSinceParent,
/// Timeline logical size
#[serde(rename = "timeline_logical_size")]
LogicalSize,
@@ -170,32 +157,6 @@ impl MetricsKey {
.incremental_values()
}
/// `written_size` - `ancestor_lsn`.
const fn written_size_since_parent(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::WrittenSizeSinceParent,
}
.absolute_values()
}
/// `written_size` - max(`pitr_cutoff`, `ancestor_lsn`).
const fn pitr_history_size_since_parent(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::PitrHistorySizeSinceParent,
}
.absolute_values()
}
/// Exact [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
@@ -373,13 +334,7 @@ impl TenantSnapshot {
struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
ancestor_lsn: Lsn,
current_exact_logical_size: Option<u64>,
/// Whether PITR is enabled (pitr_interval > 0).
pitr_enabled: bool,
/// The PITR cutoff LSN. None if not yet initialized. If PITR is disabled, this is approximately
/// Some(last_record_lsn), but may lag behind it since it's computed periodically.
pitr_cutoff: Option<Lsn>,
}
impl TimelineSnapshot {
@@ -399,9 +354,6 @@ impl TimelineSnapshot {
} else {
let loaded_at = t.loaded_at;
let last_record_lsn = t.get_last_record_lsn();
let ancestor_lsn = t.get_ancestor_lsn();
let pitr_enabled = !t.get_pitr_interval().is_zero();
let pitr_cutoff = t.gc_info.read().unwrap().cutoffs.time;
let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
@@ -421,10 +373,7 @@ impl TimelineSnapshot {
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
ancestor_lsn,
current_exact_logical_size,
pitr_enabled,
pitr_cutoff,
}))
}
}
@@ -475,8 +424,6 @@ impl TimelineSnapshot {
let up_to = now;
let written_size_last = written_size_now.value.max(prev.1); // don't regress
if let Some(delta) = written_size_now.value.checked_sub(prev.1) {
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
// written_size_delta
@@ -494,27 +441,6 @@ impl TimelineSnapshot {
});
}
// Compute the branch-local written size.
let written_size_since_parent_key =
MetricsKey::written_size_since_parent(tenant_id, timeline_id);
metrics.push(
written_size_since_parent_key
.at(now, written_size_last.saturating_sub(self.ancestor_lsn.0)),
);
// Compute the branch-local PITR history size. Not emitted if GC hasn't yet computed the
// PITR cutoff. 0 if PITR is disabled.
let pitr_history_size_since_parent_key =
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id);
if !self.pitr_enabled {
metrics.push(pitr_history_size_since_parent_key.at(now, 0));
} else if let Some(pitr_cutoff) = self.pitr_cutoff {
metrics.push(pitr_history_size_since_parent_key.at(
now,
written_size_last.saturating_sub(pitr_cutoff.max(self.ancestor_lsn).0),
));
}
{
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
let current_or_previous = self

View File

@@ -12,17 +12,12 @@ fn startup_collected_timeline_metrics_before_advancing() {
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let logical_size = 0x42000;
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
ancestor_lsn: Lsn(0),
current_exact_logical_size: Some(logical_size),
pitr_enabled: true,
pitr_cutoff: Some(pitr_cutoff),
current_exact_logical_size: Some(0x42000),
};
let now = DateTime::<Utc>::from(SystemTime::now());
@@ -38,11 +33,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
@@ -58,9 +49,7 @@ fn startup_collected_timeline_metrics_second_round() {
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let logical_size = 0x42000;
let mut metrics = Vec::new();
let cache = HashMap::from([MetricsKey::written_size(tenant_id, timeline_id)
@@ -70,10 +59,7 @@ fn startup_collected_timeline_metrics_second_round() {
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
ancestor_lsn: Lsn(0),
current_exact_logical_size: Some(logical_size),
pitr_enabled: true,
pitr_cutoff: Some(pitr_cutoff),
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -83,11 +69,7 @@ fn startup_collected_timeline_metrics_second_round() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
@@ -104,9 +86,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let logical_size = 0x42000;
let mut metrics = Vec::new();
let cache = HashMap::from([
@@ -123,10 +103,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
ancestor_lsn: Lsn(0),
current_exact_logical_size: Some(logical_size),
pitr_enabled: true,
pitr_cutoff: Some(pitr_cutoff),
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
@@ -136,18 +113,16 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0 - pitr_cutoff.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, logical_size)
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
/// Tests that written sizes do not regress across restarts.
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
// should never go backwards
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
@@ -165,10 +140,7 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: Some(Lsn(20)),
};
let mut cache = HashMap::from([
@@ -197,8 +169,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 100),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 80),
]
);
@@ -213,157 +183,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 100),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 80),
]
);
}
/// Tests that written sizes do not regress across restarts, even on child branches.
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_ancestor_lsn() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [later, now, at_restart] = time_backwards();
// FIXME: tests would be so much easier if we did not need to juggle back and forth
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
let now = DateTime::<Utc>::from(now);
let later = DateTime::<Utc>::from(later);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let way_before = DateTime::<Utc>::from(way_before);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(40),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: Some(Lsn(20)),
};
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id)
.at(before_restart, 100)
.to_kv_pair(),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_until(
way_before,
before_restart,
// not taken into account, but the timestamps are important
999_999_999,
)
.to_kv_pair(),
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
before_restart,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 60),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 60),
]
);
// now if we cache these metrics, and re-run while "still in recovery"
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
// "still in recovery", because our snapshot did not change
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 60),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 60),
]
);
}
/// Tests that written sizes do not regress across restarts, even on child branches and
/// with a PITR cutoff after the branch point.
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn_and_ancestor_lsn_and_pitr_cutoff() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [later, now, at_restart] = time_backwards();
// FIXME: tests would be so much easier if we did not need to juggle back and forth
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
let now = DateTime::<Utc>::from(now);
let later = DateTime::<Utc>::from(later);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let way_before = DateTime::<Utc>::from(way_before);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(30),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: Some(Lsn(40)),
};
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id)
.at(before_restart, 100)
.to_kv_pair(),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_until(
way_before,
before_restart,
// not taken into account, but the timestamps are important
999_999_999,
)
.to_kv_pair(),
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
before_restart,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 70),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 60),
]
);
// now if we cache these metrics, and re-run while "still in recovery"
cache.extend(metrics.drain(..).map(|x| x.to_kv_pair()));
// "still in recovery", because our snapshot did not change
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(later, 70),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(later, 60),
]
);
}
@@ -382,10 +201,7 @@ fn post_restart_current_exact_logical_size_uses_cached() {
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: None,
};
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
@@ -470,101 +286,16 @@ fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
times
}
/// Tests that disabled PITR history does not yield any history size, even when the PITR cutoff
/// indicates otherwise.
#[test]
fn pitr_disabled_yields_no_history_size() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let pitr_cutoff = Lsn(0x11000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: false,
pitr_cutoff: Some(pitr_cutoff),
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 0),
]
);
}
/// Tests that uninitialized PITR cutoff does not emit any history size metric at all.
#[test]
fn pitr_uninitialized_does_not_emit_history_size() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
ancestor_lsn: Lsn(0),
current_exact_logical_size: None,
pitr_enabled: true,
pitr_cutoff: None,
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id)
.at(now, disk_consistent_lsn.0),
]
);
}
pub(crate) const fn metric_examples_old(
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [RawMetric; 7] {
) -> [RawMetric; 5] {
[
MetricsKey::written_size(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_until_old_format(before, now, 0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at_old_format(now, 0),
MetricsKey::remote_storage_size(tenant_id).at_old_format(now, 0),
MetricsKey::synthetic_size(tenant_id).at_old_format(now, 1),
@@ -576,12 +307,10 @@ pub(crate) const fn metric_examples(
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [NewRawMetric; 7] {
) -> [NewRawMetric; 5] {
[
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size_since_parent(tenant_id, timeline_id).at(now, 0),
MetricsKey::pitr_history_size_since_parent(tenant_id, timeline_id).at(now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
MetricsKey::synthetic_size(tenant_id).at(now, 1),

View File

@@ -513,14 +513,6 @@ mod tests {
line!(),
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"pitr_history_size_since_parent","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
@@ -568,7 +560,7 @@ mod tests {
assert_eq!(upgraded_samples, new_samples);
}
fn metric_samples_old() -> [RawMetric; 7] {
fn metric_samples_old() -> [RawMetric; 5] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);
@@ -580,7 +572,7 @@ mod tests {
super::super::metrics::metric_examples_old(tenant_id, timeline_id, now, before)
}
fn metric_samples() -> [NewRawMetric; 7] {
fn metric_samples() -> [NewRawMetric; 5] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);

View File

@@ -3,7 +3,6 @@
mod auth;
pub mod basebackup;
pub mod basebackup_cache;
pub mod config;
pub mod consumption_metrics;
pub mod context;
@@ -84,7 +83,6 @@ pub async fn shutdown_pageserver(
http_listener: HttpEndpointListener,
https_listener: Option<HttpsEndpointListener>,
page_service: page_service::Listener,
grpc_task: Option<CancellableTask>,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
@@ -178,16 +176,6 @@ pub async fn shutdown_pageserver(
)
.await;
// Shut down the gRPC server task, including request handlers.
if let Some(grpc_task) = grpc_task {
timed(
grpc_task.shutdown(),
"shutdown gRPC PageRequestHandler",
Duration::from_secs(3),
)
.await;
}
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC tasks.
timed(

View File

@@ -4359,42 +4359,6 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
.set(u64::try_from(num_threads.get()).unwrap());
}
pub(crate) static BASEBACKUP_CACHE_READ: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_basebackup_cache_read_total",
"Number of read accesses to the basebackup cache grouped by hit/miss/error",
&["result"]
)
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_basebackup_cache_prepare_total",
"Number of prepare requests processed by the basebackup cache grouped by ok/skip/error",
&["result"]
)
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_entries_total",
"Number of entries in the basebackup cache"
)
.expect("failed to define a metric")
});
// FIXME: Support basebackup cache size metrics.
#[allow(dead_code)]
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_size_bytes",
"Total size of all basebackup cache entries on disk in bytes"
)
.expect("failed to define a metric")
});
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_config_ignored_items",

View File

@@ -4,19 +4,18 @@
use std::borrow::Cow;
use std::num::NonZeroUsize;
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::{io, str};
use crate::PERF_TRACE_TARGET;
use anyhow::{Context, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::{FutureExt, Stream, StreamExt as _};
use futures::FutureExt;
use itertools::Itertools;
use jsonwebtoken::TokenData;
use nix::sys::socket::{setsockopt, sockopt};
use once_cell::sync::OnceCell;
use pageserver_api::config::{
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
@@ -32,7 +31,6 @@ use pageserver_api::models::{
};
use pageserver_api::reltag::SlruKind;
use pageserver_api::shard::TenantShardId;
use pageserver_page_api::proto;
use postgres_backend::{
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
};
@@ -43,8 +41,6 @@ use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_rustls::TlsAcceptor;
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::{Claims, Scope, SwappableJwtAuth};
@@ -57,8 +53,7 @@ use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use crate::auth::check_permission;
use crate::basebackup::{self, BasebackupError};
use crate::basebackup_cache::BasebackupCache;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
@@ -72,14 +67,14 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::task_mgr::{COMPUTE_REQUEST_RUNTIME, TaskKind, exit_on_panic_or_error};
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
use crate::tenant::mgr::{
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
};
use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::{self, WaitLsnError};
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
use crate::{basebackup, timed_after_cancellation};
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
/// is not yet in state [`TenantState::Active`].
@@ -90,25 +85,6 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log slow GetPage requests.
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
/// Whether to enable TCP keepalives for gRPC connections. The interval and
/// timeouts are configured via sysctl. This detects dead connections sooner.
const GRPC_TCP_KEEPALIVE: bool = true;
/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
/// algorithm, which can cause latency spikes for small messages.
const GRPC_TCP_NODELAY: bool = true;
/// The interval between HTTP2 keepalive pings. This allows shutting down server
/// tasks when clients are unresponsive.
const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
/// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
/// Number of concurrent gRPC streams per TCP connection. We expect something
/// like 8 GetPage streams per connections, plus any unary requests.
const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
///////////////////////////////////////////////////////////////////////////////
pub struct Listener {
@@ -131,7 +107,6 @@ pub fn spawn(
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
basebackup_cache: Arc<BasebackupCache>,
) -> Listener {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
@@ -142,7 +117,7 @@ pub fn spawn(
// accept connections.)
DownloadBehavior::Error,
);
let task = COMPUTE_REQUEST_RUNTIME.spawn(exit_on_panic_or_error(
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"libpq listener",
libpq_listener_main(
conf,
@@ -153,7 +128,6 @@ pub fn spawn(
conf.pg_auth_type,
tls_config,
conf.page_service_pipelining.clone(),
basebackup_cache,
libpq_ctx,
cancel.clone(),
)
@@ -163,109 +137,6 @@ pub fn spawn(
Listener { cancel, task }
}
/// Spawns a gRPC server for the page service.
pub fn spawn_grpc(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: std::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
basebackup_cache: Arc<BasebackupCache>,
) -> anyhow::Result<CancellableTask> {
// Use the compute runtime.
let _runtime = COMPUTE_REQUEST_RUNTIME.enter();
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the gRPC server.
//
// NB: does not respect TCP settings, since we configure the socket manually.
// TODO: consider tuning window sizes.
// TODO: wire up tracing.
let mut server = tonic::transport::Server::builder()
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service.
let page_service = proto::PageServiceServer::new(PageServerHandler::new(
tenant_manager,
auth,
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
conf.get_vectored_concurrent_io,
ConnectionPerfSpanFields::default(),
basebackup_cache,
ctx,
cancel.clone(),
gate.enter().expect("just created"),
));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let server = server.add_service(reflection_service);
// Set up the TCP socket. We take a preconfigured TcpListener to bind the port early.
listener.set_nonblocking(true)?;
setsockopt(&listener, sockopt::KeepAlive, &GRPC_TCP_KEEPALIVE)?;
let listener = tokio::net::TcpListener::from_std(listener)?;
// Build the serve future.
let cancel_serve = cancel.clone();
let serve = async move {
// Accept TCP connections.
let tcp_conns = TcpListenerStream::new(listener).map(|result| {
let tcp_conn = result.inspect_err(|err| error!("TCP accept failed: {err}"))?;
tcp_conn.set_nodelay(GRPC_TCP_NODELAY).inspect_err(|err| {
error!("TCP nodelay failed: {err}");
})?;
Ok(tcp_conn)
});
if let Some(tls_config) = tls_config {
// If TLS is enabled, decrypt the TCP streams before passing them to the server.
let tls_acceptor = TlsAcceptor::from(tls_config);
let tls_conns = async_stream::stream! {
for await result in tcp_conns {
match result {
Ok(tcp_conn) => yield tls_acceptor
.accept(tcp_conn)
.await
.inspect_err(|err| error!("TLS handshake failed: {err}")),
Err(err) => yield Err(err),
}
}
};
server
.serve_with_incoming_shutdown(tls_conns, cancel_serve.cancelled())
.await?;
} else {
// Otherwise, just pass the plaintext TCP streams.
server
.serve_with_incoming_shutdown(tcp_conns, cancel_serve.cancelled())
.await?;
}
// Clean shutdown, wait for tasks to finish.
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
anyhow::Ok(())
};
// Spawn a task to run the serve future.
let task = tokio::spawn(exit_on_panic_or_error("grpc listener", serve));
Ok(CancellableTask { task, cancel })
}
impl Listener {
pub async fn stop_accepting(self) -> Connections {
self.cancel.cancel();
@@ -315,7 +186,6 @@ pub async fn libpq_listener_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
basebackup_cache: Arc<BasebackupCache>,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
) -> Connections {
@@ -359,7 +229,6 @@ pub async fn libpq_listener_main(
auth_type,
tls_config.clone(),
pipelining_config.clone(),
Arc::clone(&basebackup_cache),
connection_ctx,
connections_cancel.child_token(),
gate_guard,
@@ -385,7 +254,7 @@ type ConnectionHandlerResult = anyhow::Result<()>;
/// Perf root spans start at the per-request level, after shard routing.
/// This struct carries connection-level information to the root perf span definition.
#[derive(Clone, Default)]
#[derive(Clone)]
struct ConnectionPerfSpanFields {
peer_addr: String,
application_name: Option<String>,
@@ -402,7 +271,6 @@ async fn page_service_conn_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
basebackup_cache: Arc<BasebackupCache>,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -468,7 +336,6 @@ async fn page_service_conn_main(
pipelining_config,
conf.get_vectored_concurrent_io,
perf_span_fields,
basebackup_cache,
connection_ctx,
cancel.clone(),
gate_guard,
@@ -503,11 +370,6 @@ async fn page_service_conn_main(
}
}
/// Page service connection handler.
///
/// TODO: for gRPC, this will be shared by all requests from all connections.
/// Decompose it into global state and per-connection/request state, and make
/// libpq-specific options (e.g. pipelining) separate.
struct PageServerHandler {
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -528,8 +390,6 @@ struct PageServerHandler {
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
basebackup_cache: Arc<BasebackupCache>,
gate_guard: GateGuard,
}
@@ -989,7 +849,6 @@ impl PageServerHandler {
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
perf_span_fields: ConnectionPerfSpanFields,
basebackup_cache: Arc<BasebackupCache>,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -1003,7 +862,6 @@ impl PageServerHandler {
cancel,
pipelining_config,
get_vectored_concurrent_io,
basebackup_cache,
gate_guard,
}
}
@@ -2635,8 +2493,6 @@ impl PageServerHandler {
.map_err(QueryError::Disconnected)?;
self.flush_cancellable(pgb, &self.cancel).await?;
let mut from_cache = false;
// Send a tarball of the latest layer on the timeline. Compress if not
// fullbackup. TODO Compress in that case too (tests need to be updated)
if full_backup {
@@ -2654,33 +2510,7 @@ impl PageServerHandler {
.map_err(map_basebackup_error)?;
} else {
let mut writer = BufWriter::new(pgb.copyout_writer());
let cached = {
// Basebackup is cached only for this combination of parameters.
if timeline.is_basebackup_cache_enabled()
&& gzip
&& lsn.is_some()
&& prev_lsn.is_none()
{
self.basebackup_cache
.get(tenant_id, timeline_id, lsn.unwrap())
.await
} else {
None
}
};
if let Some(mut cached) = cached {
from_cache = true;
tokio::io::copy(&mut cached, &mut writer)
.await
.map_err(|e| {
map_basebackup_error(BasebackupError::Client(
e,
"handle_basebackup_request,cached,copy",
))
})?;
} else if gzip {
if gzip {
let mut encoder = GzipEncoder::with_quality(
&mut writer,
// NOTE using fast compression because it's on the critical path
@@ -2739,7 +2569,6 @@ impl PageServerHandler {
info!(
lsn_await_millis = lsn_awaited_after.as_millis(),
basebackup_millis = basebackup_after.as_millis(),
%from_cache,
"basebackup complete"
);
@@ -3248,60 +3077,6 @@ where
}
}
/// Implements the page service over gRPC.
///
/// TODO: not yet implemented, all methods return unimplemented.
#[tonic::async_trait]
impl proto::PageService for PageServerHandler {
type GetBaseBackupStream = Pin<
Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
>;
type GetPagesStream =
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
async fn check_rel_exists(
&self,
_: tonic::Request<proto::CheckRelExistsRequest>,
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_base_backup(
&self,
_: tonic::Request<proto::GetBaseBackupRequest>,
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_db_size(
&self,
_: tonic::Request<proto::GetDbSizeRequest>,
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_pages(
&self,
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_rel_size(
&self,
_: tonic::Request<proto::GetRelSizeRequest>,
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_slru_segment(
&self,
_: tonic::Request<proto::GetSlruSegmentRequest>,
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
}
impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {

View File

@@ -276,10 +276,9 @@ pub enum TaskKind {
// HTTP endpoint listener.
HttpEndpointListener,
/// Task that handles a single page service connection. A PageRequestHandler
/// task starts detached from any particular tenant or timeline, but it can
/// be associated with one later, after receiving a command from the client.
/// Also used for the gRPC page service API, including the main server task.
// Task that handles a single connection. A PageRequestHandler task
// starts detached from any particular tenant or timeline, but it can be
// associated with one later, after receiving a command from the client.
PageRequestHandler,
/// Manages the WAL receiver connection for one timeline.
@@ -381,10 +380,6 @@ pub enum TaskKind {
DetachAncestor,
ImportPgdata,
/// Background task of [`crate::basebackup_cache::BasebackupCache`].
/// Prepares basebackups and clears outdated entries.
BasebackupCache,
}
#[derive(Default)]

View File

@@ -78,7 +78,6 @@ use self::timeline::uninit::{TimelineCreateGuard, TimelineExclusionError, Uninit
use self::timeline::{
EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError,
};
use crate::basebackup_cache::BasebackupPrepareSender;
use crate::config::PageServerConf;
use crate::context;
use crate::context::RequestContextBuilder;
@@ -158,7 +157,6 @@ pub struct TenantSharedResources {
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
pub basebackup_prepare_sender: BasebackupPrepareSender,
}
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
@@ -319,15 +317,12 @@ pub struct TenantShard {
gc_cs: tokio::sync::Mutex<()>,
walredo_mgr: Option<Arc<WalRedoManager>>,
/// Provides access to timeline data sitting in the remote storage.
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: GenericRemoteStorage,
/// Access to global deletion queue for when this tenant wants to schedule a deletion.
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: DeletionQueueClient,
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_prepare_sender: BasebackupPrepareSender,
/// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -1291,7 +1286,6 @@ impl TenantShard {
remote_storage,
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
} = resources;
let attach_mode = attached_conf.location.attach_mode;
@@ -1307,7 +1301,6 @@ impl TenantShard {
remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
));
// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
@@ -4246,7 +4239,6 @@ impl TenantShard {
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
basebackup_prepare_sender: BasebackupPrepareSender,
) -> TenantShard {
assert!(!attached_conf.location.generation.is_none());
@@ -4350,7 +4342,6 @@ impl TenantShard {
ongoing_timeline_detach: std::sync::Mutex::default(),
gc_block: Default::default(),
l0_flush_global_state,
basebackup_prepare_sender,
}
}
@@ -5270,7 +5261,6 @@ impl TenantShard {
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
}
}
@@ -5853,8 +5843,6 @@ pub(crate) mod harness {
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let (basebackup_requst_sender, _) = tokio::sync::mpsc::unbounded_channel();
let tenant = Arc::new(TenantShard::new(
TenantState::Attaching,
self.conf,
@@ -5872,7 +5860,6 @@ pub(crate) mod harness {
self.deletion_queue.new_client(),
// TODO: ideally we should run all unit tests with both configs
L0FlushGlobalState::new(L0FlushConfig::default()),
basebackup_requst_sender,
));
let preload = tenant

View File

@@ -24,6 +24,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::PERF_TRACE_TARGET;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, Result, anyhow, bail, ensure};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
@@ -92,12 +94,10 @@ use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::tasks::log_compaction_error;
use super::upload_queue::NotInitialized;
use super::{
AttachedTenantConf, BasebackupPrepareSender, GcError, HeatMapTimeline, MaybeOffloaded,
AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded,
debug_assert_current_span_has_tenant_and_timeline_id,
};
use crate::PERF_TRACE_TARGET;
use crate::aux_file::AuxFileSizeEstimator;
use crate::basebackup_cache::BasebackupPrepareRequest;
use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
@@ -131,7 +131,6 @@ use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use crate::walingest::WalLagCooldown;
use crate::walredo::RedoAttemptType;
use crate::{ZERO_PAGE, task_mgr, walredo};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -197,7 +196,6 @@ pub struct TimelineResources {
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_prepare_sender: BasebackupPrepareSender,
}
pub struct Timeline {
@@ -441,9 +439,6 @@ pub struct Timeline {
pub(crate) rel_size_v2_status: ArcSwapOption<RelSizeMigration>,
wait_lsn_log_slow: tokio::sync::Semaphore,
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_prepare_sender: BasebackupPrepareSender,
}
pub(crate) enum PreviousHeatmap {
@@ -1033,7 +1028,6 @@ pub(crate) enum WaitLsnWaiter<'a> {
Tenant,
PageService,
HttpEndpoint,
BaseBackupCache,
}
/// Argument to [`Timeline::shutdown`].
@@ -1560,8 +1554,7 @@ impl Timeline {
}
WaitLsnWaiter::Tenant
| WaitLsnWaiter::PageService
| WaitLsnWaiter::HttpEndpoint
| WaitLsnWaiter::BaseBackupCache => unreachable!(
| WaitLsnWaiter::HttpEndpoint => unreachable!(
"tenant or page_service context are not expected to have task kind {:?}",
ctx.task_kind()
),
@@ -2466,41 +2459,6 @@ impl Timeline {
false
}
}
pub(crate) fn is_basebackup_cache_enabled(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.basebackup_cache_enabled
.unwrap_or(self.conf.default_tenant_conf.basebackup_cache_enabled)
}
/// Prepare basebackup for the given LSN and store it in the basebackup cache.
/// The method is asynchronous and returns immediately.
/// The actual basebackup preparation is performed in the background
/// by the basebackup cache on a best-effort basis.
pub(crate) fn prepare_basebackup(&self, lsn: Lsn) {
if !self.is_basebackup_cache_enabled() {
return;
}
if !self.tenant_shard_id.is_shard_zero() {
// In theory we should never get here, but just in case check it.
// Preparing basebackup doesn't make sense for shards other than shard zero.
return;
}
let res = self
.basebackup_prepare_sender
.send(BasebackupPrepareRequest {
tenant_shard_id: self.tenant_shard_id,
timeline_id: self.timeline_id,
lsn,
});
if let Err(e) = res {
// May happen during shutdown, it's not critical.
info!("Failed to send shutdown checkpoint: {e:#}");
}
}
}
/// Number of times we will compute partition within a checkpoint distance.
@@ -2578,13 +2536,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
pub(crate) fn get_pitr_interval(&self) -> Duration {
let tenant_conf = &self.tenant_conf.load().tenant_conf;
tenant_conf
.pitr_interval
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
fn get_compaction_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -3070,8 +3021,6 @@ impl Timeline {
rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status),
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
basebackup_prepare_sender: resources.basebackup_prepare_sender,
};
result.repartition_threshold =

View File

@@ -1316,10 +1316,6 @@ impl WalIngest {
}
});
if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
modification.tline.prepare_basebackup(lsn);
}
Ok(())
}

View File

@@ -717,7 +717,7 @@ prefetch_read(PrefetchRequest *slot)
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
Assert(readpage_reentrant_guard || AmPrewarmWorker);
Assert(readpage_reentrant_guard);
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
@@ -800,7 +800,7 @@ communicator_prefetch_receive(BufferTag tag)
PrfHashEntry *entry;
PrefetchRequest hashkey;
Assert(readpage_reentrant_guard || AmPrewarmWorker); /* do not pump prefetch state in prewarm worker */
Assert(readpage_reentrant_guard);
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
@@ -2450,7 +2450,6 @@ void
communicator_reconfigure_timeout_if_needed(void)
{
bool needs_set = MyPState->ring_receive != MyPState->ring_unused &&
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
readahead_getpage_pull_timeout_ms > 0;
if (needs_set != timeout_set)

View File

@@ -201,8 +201,6 @@ static shmem_request_hook_type prev_shmem_request_hook;
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
bool AmPrewarmWorker;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
/*
@@ -847,8 +845,6 @@ lfc_prewarm_main(Datum main_arg)
PrewarmWorkerState* ws;
uint32 worker_id = DatumGetInt32(main_arg);
AmPrewarmWorker = true;
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();

View File

@@ -23,8 +23,6 @@ extern int wal_acceptor_connection_timeout;
extern int readahead_getpage_pull_timeout_ms;
extern bool disable_wal_prev_lsn_checks;
extern bool AmPrewarmWorker;
#if PG_MAJORVERSION_NUM >= 17
extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
extern uint32 WAIT_EVENT_NEON_LFC_READ;

View File

@@ -1100,35 +1100,14 @@ async fn query_to_json<T: GenericClient>(
.map_err(SqlOverHttpError::Postgres)?;
let query_acknowledged = Instant::now();
let columns_len = row_stream.statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut types = Vec::with_capacity(columns_len);
for c in row_stream.statement.columns() {
fields.push(json!({
"name": c.name().to_owned(),
"dataTypeID": c.type_().oid(),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
}));
types.push(c.type_().clone());
}
let raw_output = parsed_headers.raw_output;
let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode);
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
let mut rows = Vec::new();
let mut rows: Vec<postgres_client::Row> = Vec::new();
while let Some(row) = row_stream.next().await {
let row = row.map_err(SqlOverHttpError::Postgres)?;
*current_size += row.body_len();
rows.push(row);
// we don't have a streaming response support yet so this is to prevent OOM
// from a malicious query (eg a cross join)
if *current_size > config.max_response_size_bytes {
@@ -1136,19 +1115,11 @@ async fn query_to_json<T: GenericClient>(
config.max_response_size_bytes,
));
}
let row = pg_text_row_to_json(&row, &types, raw_output, array_mode)?;
rows.push(row);
// assumption: parsing pg text and converting to json takes CPU time.
// let's assume it is slightly expensive, so we should consume some cooperative budget.
// Especially considering that `RowStream::next` might be pulling from a batch
// of rows and never hit the tokio mpsc for a long time (although unlikely).
tokio::task::consume_budget().await;
}
let query_resp_end = Instant::now();
let RowStream {
statement,
command_tag,
status: ready,
..
@@ -1176,6 +1147,38 @@ async fn query_to_json<T: GenericClient>(
"finished executing query"
);
let columns_len = statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut columns = Vec::with_capacity(columns_len);
for c in statement.columns() {
fields.push(json!({
"name": c.name().to_owned(),
"dataTypeID": c.type_().oid(),
"tableID": c.table_oid(),
"columnID": c.column_id(),
"dataTypeSize": c.type_size(),
"dataTypeModifier": c.type_modifier(),
"format": "text",
}));
match client.get_type(c.type_oid()).await {
Ok(t) => columns.push(t),
Err(err) => {
tracing::warn!(?err, "unable to query type information");
return Err(SqlOverHttpError::InternalPostgres(err));
}
}
}
let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode);
// convert rows to JSON
let rows = rows
.iter()
.map(|row| pg_text_row_to_json(row, &columns, parsed_headers.raw_output, array_mode))
.collect::<Result<Vec<_>, _>>()?;
// Resulting JSON format is based on the format of node-postgres result.
let results = json!({
"command": command_tag_name.to_string(),

View File

@@ -32,6 +32,12 @@ BENCHMARKS_DURATION_QUERY = """
# the total duration varies from 8 to 40 minutes.
# We use some pre-collected durations as a fallback to have a better distribution.
FALLBACK_DURATION = {
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]": 400.15,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]": 372.521,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30]": 420.017,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]": 373.769,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-13-30]": 678.742,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-6-30]": 512.135,
"test_runner/performance/test_branch_creation.py::test_branch_creation_heavy_write[20]": 58.036,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many_relations": 22.104,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many[1024]": 126.073,

View File

@@ -17,14 +17,12 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use camino::Utf8PathBuf;
use clap::{Parser, command};
use futures::future::OptionFuture;
use futures_core::Stream;
use futures_util::StreamExt;
use http_body_util::combinators::BoxBody;
use http_body_util::{Empty, Full};
use http_body_util::Full;
use http_utils::tls_certs::ReloadingCertificateResolver;
use hyper::body::Incoming;
use hyper::header::CONTENT_TYPE;
@@ -48,6 +46,7 @@ use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::time;
use tonic::body::{self, BoxBody, empty_body};
use tonic::codegen::Service;
use tonic::{Code, Request, Response, Status};
use tracing::*;
@@ -635,7 +634,7 @@ impl BrokerService for Broker {
// We serve only metrics and healthcheck through http1.
async fn http1_handler(
req: hyper::Request<Incoming>,
) -> Result<hyper::Response<BoxBody<Bytes, Infallible>>, Infallible> {
) -> Result<hyper::Response<BoxBody>, Infallible> {
let resp = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
@@ -646,16 +645,16 @@ async fn http1_handler(
hyper::Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(BoxBody::new(Full::new(Bytes::from(buffer))))
.body(body::boxed(Full::new(bytes::Bytes::from(buffer))))
.unwrap()
}
(&Method::GET, "/status") => hyper::Response::builder()
.status(StatusCode::OK)
.body(BoxBody::new(Empty::new()))
.body(empty_body())
.unwrap(),
_ => hyper::Response::builder()
.status(StatusCode::NOT_FOUND)
.body(BoxBody::new(Empty::new()))
.body(empty_body())
.unwrap(),
};
Ok(resp)

View File

@@ -682,7 +682,7 @@ class NeonEnvBuilder:
log.info(
f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}"
)
subprocess.run(["cp", "-a", tenants_from_dir, tenants_to_dir], check=True)
shutil.copytree(tenants_from_dir, tenants_to_dir)
else:
log.info(
f"Creating overlayfs mount of pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}"
@@ -698,9 +698,8 @@ class NeonEnvBuilder:
shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True)
if self.test_overlay_dir is None:
log.info("Copying local_fs_remote_storage directory from snapshot")
subprocess.run(
["cp", "-a", f"{repo_dir / 'local_fs_remote_storage'}", f"{self.repo_dir}"],
check=True,
shutil.copytree(
repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage"
)
else:
log.info("Creating overlayfs mount of local_fs_remote_storage directory from snapshot")
@@ -1224,7 +1223,6 @@ class NeonEnv:
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
grpc_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
for ps_id in range(
self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers
):
@@ -1251,7 +1249,6 @@ class NeonEnv:
else None,
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
"grpc_auth_type": grpc_auth_type,
"availability_zone": availability_zone,
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data

View File

@@ -14,7 +14,7 @@ from fixtures.neon_fixtures import (
PgBin,
wait_for_last_flush_lsn,
)
from fixtures.utils import get_scale_for_db, humantime_to_ms
from fixtures.utils import get_scale_for_db, humantime_to_ms, skip_on_ci
from performance.pageserver.util import setup_pageserver_with_tenants
@@ -36,6 +36,9 @@ if TYPE_CHECKING:
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(200)])
@pytest.mark.parametrize("n_tenants", [500])
@pytest.mark.timeout(10000)
@skip_on_ci(
"This test needs lot of resources and should run on dedicated HW, not in github action runners as part of CI"
)
def test_pageserver_characterize_throughput_with_n_tenants(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
@@ -60,6 +63,9 @@ def test_pageserver_characterize_throughput_with_n_tenants(
@pytest.mark.parametrize("n_clients", [1, 64])
@pytest.mark.parametrize("n_tenants", [1])
@pytest.mark.timeout(2400)
@skip_on_ci(
"This test needs lot of resources and should run on dedicated HW, not in github action runners as part of CI"
)
def test_pageserver_characterize_latencies_with_1_client_and_throughput_with_many_clients_one_tenant(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,

View File

@@ -1,77 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
def test_basebackup_cache(neon_env_builder: NeonEnvBuilder):
"""
Simple test for basebackup cache.
1. Check that we always hit the cache after compute restart.
2. Check that we eventually delete old basebackup files, but not the latest one.
3. Check that we delete basebackup file for timeline with active compute.
"""
neon_env_builder.pageserver_config_override = """
tenant_config = { basebackup_cache_enabled = true }
basebackup_cache_config = { cleanup_period = '1s' }
"""
env = neon_env_builder.init_start()
ep = env.endpoints.create("main")
ps = env.pageserver
ps_http = ps.http_client()
# 1. Check that we always hit the cache after compute restart.
for i in range(3):
ep.start()
ep.stop()
def check_metrics(i=i):
metrics = ps_http.get_metrics()
# Never miss.
# The first time compute_ctl sends `get_basebackup` with lsn=None, we do not cache such requests.
# All other requests should be a hit
assert (
metrics.query_one(
"pageserver_basebackup_cache_read_total", {"result": "miss"}
).value
== 0
)
# All but the first requests are hits.
assert (
metrics.query_one("pageserver_basebackup_cache_read_total", {"result": "hit"}).value
== i
)
# Every compute shut down should trigger a prepare reuest.
assert (
metrics.query_one(
"pageserver_basebackup_cache_prepare_total", {"result": "ok"}
).value
== i + 1
)
wait_until(check_metrics)
# 2. Check that we eventually delete old basebackup files, but not the latest one.
def check_bb_file_count():
bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir())
# tmp dir + 1 basebackup file.
assert len(bb_files) == 2
wait_until(check_bb_file_count)
# 3. Check that we delete basebackup file for timeline with active compute.
ep.start()
ep.safe_psql("create table t1 as select generate_series(1, 10) as n")
def check_bb_dir_empty():
bb_files = list(ps.workdir.joinpath("basebackup_cache").iterdir())
# only tmp dir.
assert len(bb_files) == 1
wait_until(check_bb_dir_empty)

View File

@@ -217,11 +217,11 @@ if SQL_EXPORTER is None:
self, logs_dir: Path, config_file: Path, collector_file: Path, port: int
) -> None:
# NOTE: Keep the version the same as in
# compute/compute-node.Dockerfile and build-tools.Dockerfile.
# compute/Dockerfile.compute-node and Dockerfile.build-tools.
#
# The "host" network mode allows sql_exporter to talk to the
# endpoint which is running on the host.
super().__init__("docker.io/burningalchemist/sql_exporter:0.17.3", network_mode="host")
super().__init__("docker.io/burningalchemist/sql_exporter:0.17.0", network_mode="host")
self.__logs_dir = logs_dir
self.__port = port

View File

@@ -508,9 +508,6 @@ PER_METRIC_VERIFIERS = {
"remote_storage_size": CannotVerifyAnything,
"written_size": WrittenDataVerifier,
"written_data_bytes_delta": WrittenDataDeltaVerifier,
"written_size_since_parent": WrittenDataVerifier, # same as written_size on root
"pitr_cutoff": CannotVerifyAnything,
"pitr_history_size_since_parent": WrittenDataVerifier, # same as written_size on root w/o GC
"timeline_logical_size": CannotVerifyAnything,
"synthetic_storage_size": SyntheticSizeVerifier,
}

View File

@@ -18,8 +18,6 @@ license.workspace = true
ahash = { version = "0.8" }
anstream = { version = "0.6" }
anyhow = { version = "1", features = ["backtrace"] }
axum = { version = "0.8", features = ["ws"] }
axum-core = { version = "0.5", default-features = false, features = ["tracing"] }
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
base64-647d43efb71741da = { package = "base64", version = "0.21" }
base64ct = { version = "1", default-features = false, features = ["std"] }
@@ -54,8 +52,9 @@ hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
indexmap = { version = "2", features = ["serde"] }
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.12" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
@@ -83,7 +82,7 @@ regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
rustls-pki-types = { version = "1", features = ["std"] }
rustls-webpki = { version = "0.103", default-features = false, features = ["ring", "std"] }
rustls-webpki = { version = "0.102", default-features = false, features = ["ring", "std"] }
scopeguard = { version = "1" }
sec1 = { version = "0.7", features = ["pem", "serde", "std", "subtle"] }
serde = { version = "1", features = ["alloc", "derive"] }
@@ -100,10 +99,11 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
tonic = { version = "0.12", default-features = false, features = ["codegen", "prost", "tls-roots"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
@@ -125,7 +125,8 @@ either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = { version = "2", features = ["serde"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.12" }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }