Compare commits

...

7 Commits

Author SHA1 Message Date
Alex Chi Z
6556ca03db fix(pageserver): replorigin should be inherited
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-28 10:21:42 -04:00
Alex Chi Z.
11f6044338 fix(pageserver): report synthetic size = 1 if all tls offloaded (2) (#11731)
## Problem

https://github.com/neondatabase/neon/pull/11648 did this for resident
size instead of synthetic size.

## Summary of changes

Report synthetic_size == 1 if all timelines are offloaded.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-04-28 13:45:45 +00:00
Konstantin Knizhnik
692c0f3fb8 Prepare to prewarm support (#11740)
## Problem

See
(original prewarm implementation)
https://github.com/neondatabase/neon/pull/9197
(functions for storing/restoring LFC state)
https://github.com/neondatabase/neon/pull/9587
(store prefetch results in LFC)
https://github.com/neondatabase/neon/pull/10442

## Summary of changes

Preparation for prewarm implementation.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-28 13:24:18 +00:00
Alexander Bayandin
2b1d2a55d6 CI: fix typo oicd -> oidc (#11747)
## Problem

It's OIDC (OpenID Connect), not OICD

## Summary of changes
- Rename actions input `aws-oicd-role-arn` -> `aws-oidc-role-arn`
2025-04-28 12:44:28 +00:00
Konstantin Knizhnik
60b9fb1baf Ignore unlogged LSNs in set last written LSN (#11743)
## Problem

See https://github.com/neondatabase/neon/issues/11718
and https://neondb.slack.com/archives/C033RQ5SPDH/p1745122797538509

GIST other indexes performing "unlogged build" are using so called fake
LSNs - not a real LSN, but something like 0/1. Been stored in lwlsn
cache they cause incorrect lookup at PS.

## Summary of changes

Do not store fake LSNs in LwLSN hash.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-04-28 12:16:29 +00:00
Erik Grinaker
606f14034e pageserver: improve pageserver_smgr_query_seconds buckets (#11680)
## Problem

The `pageserver_smgr_query_seconds` buckets are too coarse, using powers
of 10: 1 µs, 10 µs, 100 µs, 1 ms, 10 ms, 100 ms, 1 s, 10 s, 100 s. This
is one of our most crucial latency metrics, and needs better resolution.

Touches #11594.

## Summary of changes

This patch uses buckets with better resolution around 1 ms (the typical
latency):

* 0.6 ms
* 1 ms
* 3 ms
* 6 ms
* 10 ms
* 30 ms
* 100 ms
* 1 s
* 3 s

These will be the same as the compute's `compute_getpage_wait_seconds`,
to make them comparable across the compute and Pageserver:
https://github.com/neondatabase/flux-fleet/pull/579. We sacrifice
buckets above 3 s, since these can already be considered "too slow".

This does not change the previously used `CRITICAL_OP_BUCKETS`, which is
also used for other operations on different timescales (e.g. LSN waits).
We should consider replacing this with more appropriate buckets for
specific operations, since it covers a large span with low resolution.
2025-04-28 11:52:44 +00:00
Conrad Ludgate
32393b4393 pg-sni-router: support compute TLS on different port (#11732)
## Problem

pg-sni-router isn't aware of compute TLS

## Summary of changes

If connections come in on port 4433, we require TLS to compute from
pg-sni-router
2025-04-28 11:29:44 +00:00
28 changed files with 320 additions and 152 deletions

View File

@@ -7,7 +7,7 @@ inputs:
type: boolean
required: false
default: false
aws-oicd-role-arn:
aws-oidc-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -88,7 +88,7 @@ runs:
if: ${{ !cancelled() }}
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-duration-seconds: 3600 # 1 hour should be more than enough to upload report
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this

View File

@@ -8,7 +8,7 @@ inputs:
unique-key:
description: 'string to distinguish different results in the same run'
required: true
aws-oicd-role-arn:
aws-oidc-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -39,7 +39,7 @@ runs:
if: ${{ !cancelled() }}
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-duration-seconds: 3600 # 1 hour should be more than enough to upload report
- name: Upload test results

View File

@@ -15,7 +15,7 @@ inputs:
prefix:
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
aws-oicd-role-arn:
aws-oidc-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -25,7 +25,7 @@ runs:
- uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-duration-seconds: 3600
- name: Download artifact

View File

@@ -53,7 +53,7 @@ inputs:
description: 'benchmark durations JSON'
required: false
default: '{}'
aws-oicd-role-arn:
aws-oidc-role-arn:
description: 'OIDC role arn to interract with S3'
required: true
@@ -66,7 +66,7 @@ runs:
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build_type }}${{ inputs.sanitizers == 'enabled' && '-sanitized' || '' }}-artifact
path: /tmp/neon
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
- name: Download Neon binaries for the previous release
if: inputs.build_type != 'remote'
@@ -75,7 +75,7 @@ runs:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build_type }}-artifact
path: /tmp/neon-previous
prefix: latest
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
- name: Download compatibility snapshot
if: inputs.build_type != 'remote'
@@ -87,7 +87,7 @@ runs:
# The lack of compatibility snapshot (for example, for the new Postgres version)
# shouldn't fail the whole job. Only relevant test should fail.
skip-if-does-not-exist: true
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
- name: Checkout
if: inputs.needs_postgres_source == 'true'
@@ -228,13 +228,13 @@ runs:
# The lack of compatibility snapshot shouldn't fail the job
# (for example if we didn't run the test for non build-and-test workflow)
skip-if-does-not-exist: true
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
- uses: aws-actions/configure-aws-credentials@v4
if: ${{ !cancelled() }}
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-duration-seconds: 3600 # 1 hour should be more than enough to upload report
- name: Upload test results
@@ -243,4 +243,4 @@ runs:
with:
report-dir: /tmp/test_output/allure/results
unique-key: ${{ inputs.build_type }}-${{ inputs.pg_version }}-${{ runner.arch }}
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}

View File

@@ -14,11 +14,11 @@ runs:
name: coverage-data-artifact
path: /tmp/coverage
skip-if-does-not-exist: true # skip if there's no previous coverage to download
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}
- name: Upload coverage data
uses: ./.github/actions/upload
with:
name: coverage-data-artifact
path: /tmp/coverage
aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }}
aws-oidc-role-arn: ${{ inputs.aws-oidc-role-arn }}

View File

@@ -14,7 +14,7 @@ inputs:
prefix:
description: "S3 prefix. Default is '${GITHUB_SHA}/${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
aws-oicd-role-arn:
aws-oidc-role-arn:
description: "the OIDC role arn for aws auth"
required: false
default: ""
@@ -61,7 +61,7 @@ runs:
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ inputs.aws-oicd-role-arn }}
role-to-assume: ${{ inputs.aws-oidc-role-arn }}
role-duration-seconds: 3600
- name: Upload artifact

View File

@@ -81,7 +81,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# we create a table that has one row for each database that we want to restore with the status whether the restore is done
- name: Create benchmark_restore_status table if it does not exist

View File

@@ -323,7 +323,7 @@ jobs:
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}${{ inputs.sanitizers == 'enabled' && '-sanitized' || '' }}-artifact
path: /tmp/neon
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Check diesel schema
if: inputs.build-type == 'release' && inputs.arch == 'x64'
@@ -394,7 +394,7 @@ jobs:
rerun_failed: ${{ inputs.test-run-count == 1 }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.

View File

@@ -114,7 +114,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project
@@ -132,7 +132,7 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
@@ -165,7 +165,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -222,8 +222,8 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Verify that cumulative statistics are preserved
uses: ./.github/actions/run-python-test-set
with:
@@ -233,7 +233,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 3600
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -282,7 +282,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run Logical Replication benchmarks
uses: ./.github/actions/run-python-test-set
@@ -293,7 +293,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -310,7 +310,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -322,7 +322,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
@@ -505,7 +505,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: contains(fromJSON('["neonvm-captest-new", "neonvm-captest-new-many-tables", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
@@ -557,7 +557,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -573,7 +573,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -588,7 +588,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -603,7 +603,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -621,7 +621,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -694,7 +694,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
@@ -726,7 +726,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgvector_indexing
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -741,7 +741,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -752,7 +752,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -828,7 +828,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
@@ -871,7 +871,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 43200 -k test_clickbench
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -885,7 +885,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -954,7 +954,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Get Connstring Secret Name
run: |
@@ -1003,7 +1003,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_tpch
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -1015,7 +1015,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
@@ -1078,7 +1078,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Set up Connection String
id: set-up-connstr
@@ -1121,7 +1121,7 @@ jobs:
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_user_examples
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -1132,7 +1132,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}

View File

@@ -93,7 +93,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_DEV }}

View File

@@ -317,7 +317,7 @@ jobs:
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
pg_version: v16
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -384,7 +384,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
@@ -451,14 +451,14 @@ jobs:
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-${{ matrix.build_type }}-artifact
path: /tmp/neon
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Get coverage artifact
uses: ./.github/actions/download
with:
name: coverage-data-artifact
path: /tmp/coverage
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge

View File

@@ -117,7 +117,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -89,7 +89,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create a new branch
id: create-branch
@@ -105,7 +105,7 @@ jobs:
test_selection: cloud_regress
pg_version: ${{matrix.pg-version}}
extra_params: -m remote_cluster
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{steps.create-branch.outputs.dsn}}
@@ -122,7 +122,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}

View File

@@ -32,7 +32,7 @@ jobs:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- target_project: new_empty_project_stripe_size_2048
- target_project: new_empty_project_stripe_size_2048
stripe_size: 2048 # 16 MiB
postgres_version: 16
disable_sharding: false
@@ -98,7 +98,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
@@ -110,10 +110,10 @@ jobs:
compute_units: '[7, 7]' # we want to test large compute here to avoid compute-side bottleneck
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
shard_split_project: ${{ matrix.stripe_size != null && 'true' || 'false' }}
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
shard_count: 8
stripe_size: ${{ matrix.stripe_size }}
disable_sharding: ${{ matrix.disable_sharding }}
disable_sharding: ${{ matrix.disable_sharding }}
- name: Initialize Neon project
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
@@ -171,7 +171,7 @@ jobs:
extra_params: -s -m remote_cluster --timeout 86400 -k test_ingest_performance_using_pgcopydb
pg_version: v${{ matrix.postgres_version }}
save_perf_report: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_INGEST_SOURCE_CONNSTR: ${{ secrets.BENCHMARK_INGEST_SOURCE_CONNSTR }}
TARGET_PROJECT_TYPE: ${{ matrix.target_project }}

View File

@@ -33,9 +33,9 @@ jobs:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- target: new_branch
- target: new_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
- target: reuse_branch
- target: reuse_branch
custom_scripts: insert_webhooks.sql@200 select_any_webhook_with_skew.sql@300 select_recent_webhook.sql@397 select_prefetch_webhook.sql@3 IUD_one_transaction.sql@100
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
@@ -43,7 +43,7 @@ jobs:
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "1h" # todo update to > 1 h
TEST_PG_BENCH_DURATIONS_MATRIX: "1h" # todo update to > 1 h
TEST_PGBENCH_CUSTOM_SCRIPTS: ${{ matrix.custom_scripts }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: 16 # pre-determined by pre-determined project
@@ -85,7 +85,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Branch for large tenant
if: ${{ matrix.target == 'new_branch' }}
@@ -129,7 +129,7 @@ jobs:
${PSQL} "${BENCHMARK_CONNSTR}" -c "SET statement_timeout = 0; DELETE FROM webhook.incoming_webhooks WHERE created_at > '2025-02-27 23:59:59+00';"
echo "$(date '+%Y-%m-%d %H:%M:%S') - Finished deleting rows in table webhook.incoming_webhooks from prior runs"
- name: Benchmark pgbench with custom-scripts
- name: Benchmark pgbench with custom-scripts
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
@@ -138,7 +138,7 @@ jobs:
save_perf_report: true
extra_params: -m remote_cluster --timeout 7200 -k test_perf_oltp_large_tenant_pgbench
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -153,7 +153,7 @@ jobs:
save_perf_report: true
extra_params: -m remote_cluster --timeout 172800 -k test_perf_oltp_large_tenant_maintenance
pg_version: ${{ env.PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr_without_pooler }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -179,8 +179,8 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1

View File

@@ -147,7 +147,7 @@ jobs:
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}

View File

@@ -103,7 +103,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project
@@ -122,7 +122,7 @@ jobs:
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
@@ -139,7 +139,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
@@ -178,7 +178,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project
@@ -195,7 +195,7 @@ jobs:
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
@@ -212,7 +212,7 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -66,7 +66,7 @@ jobs:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Run tests
uses: ./.github/actions/run-python-test-set
@@ -76,7 +76,7 @@ jobs:
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ matrix.pg-version }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
RANDOM_SEED: ${{ inputs.random_seed }}
@@ -88,6 +88,6 @@ jobs:
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -844,8 +844,6 @@ impl Key {
}
pub const fn sparse_non_inherited_keyspace() -> Range<Key> {
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX);
Key {
field1: AUX_KEY_PREFIX,
field2: 0,
@@ -854,7 +852,7 @@ impl Key {
field5: 0,
field6: 0,
}..Key {
field1: REPL_ORIGIN_KEY_PREFIX + 1,
field1: AUX_KEY_PREFIX + 1,
field2: 0,
field3: 0,
field4: 0,

View File

@@ -77,7 +77,9 @@ impl StorageModel {
}
SizeResult {
total_size,
// If total_size is 0, it means that the tenant has all timelines offloaded; we need to report 1
// here so that the data point shows up in the s3 files.
total_size: total_size.max(1),
segments: segment_results,
}
}

View File

@@ -1774,8 +1774,12 @@ static SMGR_QUERY_STARTED_PER_TENANT_TIMELINE: Lazy<IntCounterVec> = Lazy::new(|
.expect("failed to define a metric")
});
// Alias so all histograms recording per-timeline smgr timings use the same buckets.
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS: &[f64] = CRITICAL_OP_BUCKETS;
/// Per-timeline smgr histogram buckets should be the same as the compute buckets, such that the
/// metrics are comparable across compute and Pageserver. See also:
/// <https://github.com/neondatabase/neon/blob/1a87975d956a8ad17ec8b85da32a137ec4893fcc/pgxn/neon/neon_perf_counters.h#L18-L27>
/// <https://github.com/neondatabase/flux-fleet/blob/556182a939edda87ff1d85a6b02e5cec901e0e9e/apps/base/compute-metrics/scrape-compute-sql-exporter.yaml#L29-L35>
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS: &[f64] =
&[0.0006, 0.001, 0.003, 0.006, 0.01, 0.03, 0.1, 1.0, 3.0];
static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(

View File

@@ -88,9 +88,6 @@ typedef PGAlignedBlock PGIOAlignedBlock;
page_server_api *page_server;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
/*
* Various settings related to prompt (fast) handling of PageStream responses
* at any CHECK_FOR_INTERRUPTS point.
@@ -788,6 +785,27 @@ prefetch_read(PrefetchRequest *slot)
}
}
/*
* Wait completion of previosly registered prefetch request.
* Prefetch result should be placed in LFC by prefetch_wait_for.
*/
bool
communicator_prefetch_receive(BufferTag tag)
{
PrfHashEntry *entry;
PrefetchRequest hashkey;
hashkey.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL && prefetch_wait_for(entry->slot->my_ring_index))
{
prefetch_set_unused(entry->slot->my_ring_index);
return true;
}
return false;
}
/*
* Disconnect hook - drop prefetches when the connection drops
*
@@ -906,7 +924,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
NeonGetPageRequest request = {
.hdr.tag = T_NeonGetPageRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
@@ -915,8 +932,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
slot->reqid = request.hdr.reqid;
if (force_request_lsns)
slot->request_lsns = *force_request_lsns;
else
@@ -934,6 +949,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
/* loop */
}
slot->reqid = request.hdr.reqid;
/* update prefetch state */
MyPState->n_requests_inflight += 1;
@@ -1937,7 +1953,6 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
{
NeonExistsRequest request = {
.hdr.tag = T_NeonExistsRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2212,7 +2227,6 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
{
NeonNblocksRequest request = {
.hdr.tag = T_NeonNblocksRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2285,7 +2299,6 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
{
NeonDbSizeRequest request = {
.hdr.tag = T_NeonDbSizeRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.dbNode = dbNode,
@@ -2353,7 +2366,6 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
request = (NeonGetSlruSegmentRequest) {
.hdr.tag = T_NeonGetSlruSegmentRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.kind = kind,

View File

@@ -37,6 +37,8 @@ extern int communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum,
BlockNumber nblocks, void **buffers, bits8 *mask);
extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask);
extern bool communicator_prefetch_receive(BufferTag tag);
extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns,
void *buffer);

View File

@@ -25,6 +25,7 @@
#include "pgstat.h"
#include "port/pg_iovec.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include RELFILEINFO_HDR
#include "storage/buf_internals.h"
#include "storage/fd.h"
@@ -32,6 +33,8 @@
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/dynahash.h"
#include "utils/guc.h"
@@ -46,6 +49,8 @@
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#include "communicator.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
@@ -87,14 +92,14 @@
* 1Mb chunks can reduce hash map size to 320Mb.
* 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed
*/
#define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */
/*
* Smaller chunk seems to be better for OLTP workload
*/
// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */
#define MAX_BLOCKS_PER_CHUNK_LOG 7 /* 1Mb chunk */
#define MAX_BLOCKS_PER_CHUNK (1 << MAX_BLOCKS_PER_CHUNK_LOG)
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log))
#define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1))
/*
* Blocks are read or written to LFC file outside LFC critical section.
@@ -119,10 +124,11 @@ typedef struct FileCacheEntry
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 state[(BLOCKS_PER_CHUNK + 31) / 32 * 2]; /* two bits per block */
dlist_node list_node; /* LRU/holes list node */
uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */
} FileCacheEntry;
#define FILE_CACHE_ENRTY_SIZE MAXALIGN(offsetof(FileCacheEntry, state) + (lfc_blocks_per_chunk*2+31)/32*4)
#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3)
#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2))
@@ -136,6 +142,7 @@ typedef struct FileCacheControl
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
uint32 used_pages; /* number of used pages */
uint32 pinned; /* number of pinned chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
@@ -158,6 +165,8 @@ static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
@@ -206,7 +215,9 @@ lfc_switch_off(void)
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->pinned = 0;
lfc_ctl->used = 0;
lfc_ctl->used_pages = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
dlist_init(&lfc_ctl->holes);
@@ -296,7 +307,7 @@ lfc_shmem_startup(void)
lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(FileCacheEntry);
info.entrysize = FILE_CACHE_ENRTY_SIZE;
/*
* n_chunks+1 because we add new element to hash table before eviction
@@ -342,7 +353,7 @@ lfc_shmem_request(void)
prev_shmem_request_hook();
#endif
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, sizeof(FileCacheEntry)));
RequestAddinShmemSpace(sizeof(FileCacheControl) + hash_estimate_size(SIZE_MB_TO_CHUNKS(lfc_max_size) + 1, FILE_CACHE_ENRTY_SIZE));
RequestNamedLWLockTranche("lfc_lock", 1);
}
@@ -359,6 +370,24 @@ is_normal_backend(void)
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
}
static bool
lfc_check_chunk_size(int *newval, void **extra, GucSource source)
{
if (*newval & (*newval - 1))
{
elog(ERROR, "LFC chunk size should be power of two");
return false;
}
return true;
}
static void
lfc_change_chunk_size(int newval, void* extra)
{
lfc_chunk_size_log = pg_ceil_log2_32(newval);
}
static bool
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
@@ -415,11 +444,11 @@ lfc_change_limit_hook(int newval, void *extra)
CriticalAssert(victim->access_count == 0);
#ifdef FALLOC_FL_PUNCH_HOLE
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * lfc_blocks_per_chunk * BLCKSZ, lfc_blocks_per_chunk * BLCKSZ) < 0)
neon_log(LOG, "Failed to punch hole in file: %m");
#endif
/* We remove the old entry, and re-enter a hole to the hash table */
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -508,6 +537,19 @@ lfc_init(void)
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_chunk_size",
"LFC chunk size in blocks (should be power of two)",
NULL,
&lfc_blocks_per_chunk,
MAX_BLOCKS_PER_CHUNK,
1,
MAX_BLOCKS_PER_CHUNK,
PGC_POSTMASTER,
GUC_UNIT_BLOCKS,
lfc_check_chunk_size,
lfc_change_chunk_size,
NULL);
if (lfc_max_size == 0)
return;
@@ -530,7 +572,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry *entry;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
bool found = false;
uint32 hash;
@@ -539,7 +581,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
hash = get_hash_value(lfc_hash, &tag);
@@ -577,9 +619,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
@@ -590,12 +632,12 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
while (true)
{
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
int this_chunk = Min(nblocks - i, lfc_blocks_per_chunk - chunk_offs);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
for (; chunk_offs < lfc_blocks_per_chunk && i < nblocks; chunk_offs++, i++)
{
if (GET_STATE(entry, chunk_offs) != UNAVAILABLE)
{
@@ -619,9 +661,9 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* Prepare for the next iteration. We don't unlock here, as that'd
* probably be more expensive than the gains it'd get us.
*/
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
chunk_offs = BLOCK_TO_CHUNK_OFF(blkno + i);
tag.blockNum = (blkno + i) - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
}
LWLockRelease(lfc_lock);
@@ -696,9 +738,9 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int8 chunk_mask[BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = (blkno & (BLOCKS_PER_CHUNK - 1));
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
uint8 chunk_mask[MAX_BLOCKS_PER_CHUNK / 8] = {0};
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
int iteration_hits = 0;
int iteration_misses = 0;
uint64 io_time_us = 0;
@@ -786,8 +828,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Unlink entry from LRU list to pin it for the duration of IO operation */
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
generation = lfc_ctl->generation;
entry_offset = entry->offset;
@@ -836,7 +880,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (iteration_hits != 0)
{
/* chunk offset (# of pages) into the LFC file */
off_t first_read_offset = (off_t) entry_offset * BLOCKS_PER_CHUNK;
off_t first_read_offset = (off_t) entry_offset * lfc_blocks_per_chunk;
int nwrite = iov_last_used - first_block_in_chunk_read;
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
@@ -884,7 +928,10 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(entry->access_count > 0);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
}
else
{
@@ -961,7 +1008,7 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->lru));
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
bool is_page_cached = GET_STATE(victim, i) == AVAILABLE;
lfc_ctl->used_pages -= is_page_cached;
@@ -979,14 +1026,14 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);
return false;
}
entry->access_count = 1;
entry->hash = hash;
lfc_ctl->pinned += 1;
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
SET_STATE(entry, i, UNAVAILABLE);
return true;
@@ -1031,7 +1078,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
FileCacheBlockState state;
XLogRecPtr lwlsn;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return false;
@@ -1041,7 +1088,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1052,7 +1099,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
return false;
}
lwlsn = neon_get_lwlsn(rinfo, forknum, blkno);
if (lwlsn > lsn)
@@ -1081,7 +1128,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1106,7 +1156,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwrite(lfc_desc, buffer, BLCKSZ,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1132,7 +1182,10 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
state = GET_STATE(entry, chunk_offs);
if (state == REQUESTED) {
@@ -1199,8 +1252,8 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
while (nblocks > 0)
{
struct iovec iov[PG_IOV_MAX];
int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK));
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
int blocks_in_chunk = Min(nblocks, lfc_blocks_per_chunk - chunk_offs);
instr_time io_start, io_end;
ConditionVariable* cv;
@@ -1212,7 +1265,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
iov[i].iov_len = BLCKSZ;
}
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno - chunk_offs;
hash = get_hash_value(lfc_hash, &tag);
cv = &lfc_ctl->cv[hash % N_COND_VARS];
@@ -1232,7 +1285,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* operation
*/
if (entry->access_count++ == 0)
{
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
}
else
{
@@ -1285,7 +1341,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ);
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
@@ -1312,7 +1368,10 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
inc_page_cache_write_wait(time_spent_us);
if (--entry->access_count == 0)
{
lfc_ctl->pinned -= 1;
dlist_push_tail(&lfc_ctl->lru, &entry->list_node);
}
for (int i = 0; i < blocks_in_chunk; i++)
{
@@ -1438,7 +1497,12 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
break;
case 8:
key = "file_cache_chunk_size_pages";
value = BLOCKS_PER_CHUNK;
value = lfc_blocks_per_chunk;
break;
case 9:
key = "file_cache_chunks_pinned";
if (lfc_ctl)
value = lfc_ctl->pinned;
break;
default:
SRF_RETURN_DONE(funcctx);
@@ -1566,7 +1630,7 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Skip hole tags */
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
}
@@ -1594,13 +1658,13 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].pageoffs = entry->offset * lfc_blocks_per_chunk + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));

View File

@@ -48,7 +48,6 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
@@ -167,6 +166,9 @@ typedef struct
WaitEventSet *wes_read;
} PageServer;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
@@ -994,6 +996,7 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
pageserver_conn = NULL;
}
request->reqid = GENERATE_REQUEST_ID();
req_buff = nm_pack_request(request);
/*

View File

@@ -396,7 +396,7 @@ SetLastWrittenLSNForBlockRangeInternal(XLogRecPtr lsn,
XLogRecPtr
neon_set_lwlsn_block_range(XLogRecPtr lsn, NRelFileInfo rlocator, ForkNumber forknum, BlockNumber from, BlockNumber n_blocks)
{
if (lsn == InvalidXLogRecPtr || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
if (lsn < FirstNormalUnloggedLSN || n_blocks == 0 || LwLsnCache->lastWrittenLsnCacheSize == 0)
return lsn;
LWLockAcquire(LastWrittenLsnLock, LW_EXCLUSIVE);
@@ -505,4 +505,5 @@ neon_set_lwlsn_db(XLogRecPtr lsn)
{
NRelFileInfo dummyNode = {InvalidOid, InvalidOid, InvalidOid};
return neon_set_lwlsn_block(lsn, dummyNode, MAIN_FORKNUM, 0);
}
}

View File

@@ -65,7 +65,6 @@ typedef enum {
SLRU_MULTIXACT_OFFSETS
} SlruKind;
/*--
* supertype of all the Neon*Request structs below.
*
@@ -129,6 +128,7 @@ typedef struct
int segno;
} NeonGetSlruSegmentRequest;
/* supertype of all the Neon*Response structs below */
typedef NeonMessage NeonResponse;
@@ -187,6 +187,7 @@ typedef struct
{
/*
* Send this request to the PageServer associated with this shard.
* This function assigns request_id to the request which can be extracted by caller from request struct.
*/
bool (*send) (shardno_t shard_no, NeonRequest * request);
/*

View File

@@ -7,13 +7,14 @@ use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, anyhow, bail, ensure};
use clap::Arg;
use futures::TryFutureExt;
use futures::future::Either;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use rustls::crypto::ring;
use rustls::pki_types::PrivateKeyDer;
use tokio::io::{AsyncRead, AsyncWrite};
use rustls::pki_types::{DnsName, PrivateKeyDer};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio_rustls::TlsConnector;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info};
use utils::project_git_version;
@@ -38,6 +39,12 @@ fn cli() -> clap::Command {
.help("listen for incoming client connections on ip:port")
.default_value("127.0.0.1:4432"),
)
.arg(
Arg::new("listen-tls")
.long("listen-tls")
.help("listen for incoming client connections on ip:port, requiring TLS to compute")
.default_value("127.0.0.1:4433"),
)
.arg(
Arg::new("tls-key")
.short('k')
@@ -122,31 +129,58 @@ pub async fn run() -> anyhow::Result<()> {
_ => bail!("tls-key and tls-cert must be specified"),
};
let compute_tls_config =
Arc::new(crate::tls::client_config::compute_client_config_with_root_certs()?);
// Start listening for incoming client connections
let proxy_address: SocketAddr = args
.get_one::<String>("listen")
.expect("string argument defined")
.expect("listen argument defined")
.parse()?;
let proxy_address_compute_tls: SocketAddr = args
.get_one::<String>("listen-tls")
.expect("listen-tls argument defined")
.parse()?;
info!("Starting sni router on {proxy_address}");
info!("Starting sni router on {proxy_address_compute_tls}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let proxy_listener_compute_tls = TcpListener::bind(proxy_address_compute_tls).await?;
let cancellation_token = CancellationToken::new();
let dest = Arc::new(destination);
let main = tokio::spawn(task_main(
Arc::new(destination),
tls_config,
dest.clone(),
tls_config.clone(),
None,
tls_server_end_point,
proxy_listener,
cancellation_token.clone(),
));
))
.map(crate::error::flatten_err);
let main_tls = tokio::spawn(task_main(
dest,
tls_config,
Some(compute_tls_config),
tls_server_end_point,
proxy_listener_compute_tls,
cancellation_token.clone(),
))
.map(crate::error::flatten_err);
let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {}));
// the signal task cant ever succeed.
// the main task can error, or can succeed on cancellation.
// we want to immediately exit on either of these cases
let main = futures::future::try_join(main, main_tls);
let signal = match futures::future::select(signals_task, main).await {
Either::Left((res, _)) => crate::error::flatten_err(res)?,
Either::Right((res, _)) => return crate::error::flatten_err(res),
Either::Right((res, _)) => {
res?;
return Ok(());
}
};
// maintenance tasks return `Infallible` success values, this is an impossible value
@@ -157,6 +191,7 @@ pub async fn run() -> anyhow::Result<()> {
async fn task_main(
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
compute_tls_config: Option<Arc<rustls::ClientConfig>>,
tls_server_end_point: TlsServerEndPoint,
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
@@ -175,6 +210,7 @@ async fn task_main(
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
let dest_suffix = Arc::clone(&dest_suffix);
let compute_tls_config = compute_tls_config.clone();
connections.spawn(
async move {
@@ -192,7 +228,15 @@ async fn task_main(
crate::metrics::Protocol::SniRouter,
"sni",
);
handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await
handle_client(
ctx,
dest_suffix,
tls_config,
compute_tls_config,
tls_server_end_point,
socket,
)
.await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
@@ -268,6 +312,7 @@ async fn handle_client(
ctx: RequestContext,
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
compute_tls_config: Option<Arc<rustls::ClientConfig>>,
tls_server_end_point: TlsServerEndPoint,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
@@ -288,7 +333,33 @@ async fn handle_client(
info!("destination: {}", destination);
let mut client = tokio::net::TcpStream::connect(destination).await?;
let mut client = tokio::net::TcpStream::connect(&destination).await?;
let client = if let Some(compute_tls_config) = compute_tls_config {
info!("upgrading TLS");
// send SslRequest
client
.write_all(b"\x00\x00\x00\x08\x04\xd2\x16\x2f")
.await?;
// wait for S/N respons
let mut resp = b'N';
client.read_exact(std::slice::from_mut(&mut resp)).await?;
// error if not S
ensure!(resp == b'S', "compute refused TLS");
// upgrade to TLS.
let domain = DnsName::try_from(destination)?;
let domain = rustls::pki_types::ServerName::DnsName(domain);
let client = TlsConnector::from(compute_tls_config)
.connect(domain, client)
.await?;
Connection::Tls(client)
} else {
Connection::Raw(client)
};
// doesn't yet matter as pg-sni-router doesn't report analytics logs
ctx.set_success();
@@ -297,9 +368,19 @@ async fn handle_client(
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
match copy_bidirectional_client_compute(&mut tls_stream, &mut client).await {
let res = match client {
Connection::Raw(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
Connection::Tls(mut c) => copy_bidirectional_client_compute(&mut tls_stream, &mut c).await,
};
match res {
Ok(_) => Ok(()),
Err(ErrorSource::Client(err)) => Err(err).context("client"),
Err(ErrorSource::Compute(err)) => Err(err).context("compute"),
}
}
enum Connection {
Raw(tokio::net::TcpStream),
Tls(tokio_rustls::client::TlsStream<tokio::net::TcpStream>),
}