Compare commits

..

22 Commits

Author SHA1 Message Date
BodoBolero
6f913f2068 fix to use lakebase access token 2025-07-30 17:09:36 +02:00
BodoBolero
bb19dc96f3 change oidc host 2025-07-30 15:10:58 +02:00
BodoBolero
073d46ab80 run workflow before merged into main 2025-07-30 15:05:32 +02:00
BodoBolero
cc26e24bd8 Trigger GitHub to register workflow 2025-07-30 15:03:19 +02:00
BodoBolero
3c8d67ae44 try first version of benchmark on lakebase 2025-07-30 14:44:22 +02:00
BodoBolero
a6cb8a7f72 add project delete and bearer token actions 2025-07-29 17:03:17 +02:00
BodoBolero
01c7de98a2 first draft of project create for lakebase 2025-07-28 16:59:08 +02:00
Tristan Partin
a6e0baf31a [BRC-1405] Mount databricks pg_hba and pg_ident from configmap (#12733)
## Problem

For certificate auth, we need to configure pg_hba and pg_ident for it to
work.

HCC needs to mount this config map to all pg compute pod.

## Summary of changes

Create `databricks_pg_hba` and `databricks_pg_ident` to configure where
the files are located on the pod. These configs are pass down to
`compute_ctl`. Compute_ctl uses these config to update `pg_hba.conf` and
`pg_ident.conf` file.

We append `include_if_exists {databricks_pg_hba}` to `pg_hba.conf` and
similarly to `pg_ident.conf`. So that it will refer to databricks config
file without much change to existing pg default config file.

---------

Co-authored-by: Jarupat Jisarojito <jarupat.jisarojito@databricks.com>
Co-authored-by: William Huang <william.huang@databricks.com>
Co-authored-by: HaoyuHuang <haoyu.huang.68@gmail.com>
2025-07-25 20:50:03 +00:00
Christian Schwarz
19b74b8837 fix(page_service): getpage requests don't hold applied_gc_cutoff_lsn guard (#12743)
Before this PR, getpage requests wouldn't hold the
`applied_gc_cutoff_lsn` guard until they were done.

Theoretical impact: if we’re not holding the `RcuReadGuard`, gc can
theoretically concurrently delete reconstruct data that we need to
reconstruct the page.

I don't think this practically occurs in production because the odds of
it happening are quite low, especially for primary read_write computes.
But RO replicas / standby_horizon relies on correct
`applied_gc_cutofff_lsn`, so, I'm fixing this as part of the work ok
replacing standby_horizon propagation mechanism with leases (LKB-88).

The change is feature-gated with a feature flag, and evaluated once when
entering `handle_pagestream` to avoid performance impact.

For observability, we add a field to the `handle_pagestream` span, and a
slow-log to the place in `gc_loop` where it waits for the in-flight
RcuReadGuard's to drain.

refs
- fixes https://databricks.atlassian.net/browse/LKB-2572
- standby_horizon leases epic:
https://databricks.atlassian.net/browse/LKB-2572

---------

Co-authored-by: Christian Schwarz <Christian Schwarz>
2025-07-25 20:25:04 +00:00
Folke Behrens
25718e324a proxy: Define service_info metric showing the run state (#12749)
## Problem

Monitoring dashboards show aggregates of all proxy instances, including
terminating ones. This can skew the results or make graphs less
readable. Also, alerts must be tuned to ignore certain signals from
terminating proxies.

## Summary of changes

Add a `service_info` metric currently with one label, `state`, showing
if an instance is in state `init`, `running`, or `terminating`. The
metric can be joined with other metrics to filter the presented time
series.
2025-07-25 18:27:21 +00:00
Dmitrii Kovalkov
ac8f44c70e tests: stop ps immediately in test_ps_unavailable_after_delete (#12728)
## Problem
test_ps_unavailable_after_delete is flaky. All test failures I've looked
at are because of ERROR log messages in pageserver, which happen because
storage controller tries runs a reconciliations during the graceful
shutdown of the pageserver.

I wasn't able to reproduce it locally, but I think stopping PS
immediately instead of gracefully should help. If not, we might just
silence those errors.

- Closes: https://databricks.atlassian.net/browse/LKB-745
2025-07-25 18:09:34 +00:00
Conrad Ludgate
d09664f039 [proxy] replace TimedLru with moka (#12726)
LKB-2536 TimedLru is hard to maintain. Let's use moka instead. Stacked
on top of #12710.
2025-07-25 17:39:48 +00:00
Mikhail
6689d6fd89 LFC prewarm perftest fixes: use existing staging project (#12651)
https://github.com/neondatabase/cloud/issues/19011

- Prewarm config changes are not publicly available.
  Correct the test by using a pre-filled 50 GB project on staging
- Create extension neon with schema neon to fix read performance tests
on staging, error example in
https://neon-github-public-dev.s3.amazonaws.com/reports/main/16483462789/index.html#suites/3d632da6dda4a70f5b4bd24904ab444c/919841e331089fc4/
- Don't create extra endpoint in LFC prewarm performance tests
2025-07-25 16:56:41 +00:00
Tristan Partin
33b400beae [BRC-1425] Plumb through and set the requisite GUCs when starting the compute instance (#12732)
## Problem

We need the set the following Postgres GUCs to the correct value before
starting Postgres in the compute instance:

```
databricks.workspace_url
databricks.enable_databricks_identity_login
databricks.enable_sql_restrictions
```

## Summary of changes

Plumbed through `workspace_url` and other GUC settings via
`DatabricksSettings` in `ComputeSpec`. The spec is sent to the compute
instance when it starts up and the GUCs are written to `postgresql.conf`
before the postgres process is launched.

---------

Co-authored-by: Jarupat Jisarojito <jarupat.jisarojito@databricks.com>
Co-authored-by: William Huang <william.huang@databricks.com>
2025-07-25 15:20:05 +00:00
Tristan Partin
ca07f7dba5 Copy pg server cert and key to pgdata with correct permission (#12731)
## Problem

Copy certificate and key from secret mount directory to `pgdata`
directory where `postgres` is the owner and we can set the key
permission to 0600.

## Summary of changes

- Added new pgparam `pg_compute_tls_settings` to specify where k8s
secret for certificate and key are mounted.
- Added a new field to `ComputeSpec` called `databricks_settings`. This
is a struct that will be used to store any other settings that needs to
be propagate to Compute but should not be persisted to `ComputeSpec` in
the database.
- Then when the compute container start up, as part of `prepare_pgdata`
function, it will copied `server.key` and `server.crt` from k8s mounted
directory to `pgdata` directory.

## How is this tested?

Add unit tests.
Manual test via KIND

Co-authored-by: Jarupat Jisarojito <jarupat.jisarojito@databricks.com>
2025-07-25 15:05:05 +00:00
Vlad Lazar
b0dfe0ffa6 storcon: attempt all non-essential location config calls during reconciliations (#12745)
## Problem

We saw the following in the field:

Context and observations:
* The storage controller keeps track of the latest generations and the
pageserver that issued the latest generation in the database
* When the storage controller needs to proxy a request (e.g. timeline
creation) to the pageservers, it will find use the pageserver that
issued the latest generation from the db (generation_pageserver).
* pageserver-2.cell-2 got into a bad state and wasn't able to apply
location_config (e.g. detach a shard)

What happened:
1. pageserver-2.cell-2 was a secondary for our shard since we were not
able to detach it
2. control plane asked to detach a tenant (presumably because it was
idle)
a. In response storcon clears the generation_pageserver from the db and
attempts to detach all locations
b. it tries to detach pageserver-2.cell-2 first, but fails, which fails
the entire reconciliation leaving the good attached location still there
c. return success to cplane

3. control plane asks to re-attach the tenant
a. In response storcon performs a reconciliation
b. it finds that the observed state matches the intent (remember we did
not detach the primary at step(2))
c. skips incrementing the genration and setting the
generation_pageserver column

Now any requests that need to be proxied to pageservers and rely on the
generation_pageserver db column fail because that's not set

## Summary of changes

1. We do all non-essential location config calls (setting up
secondaries,
detaches) at the end of the reconciliation. Previously, we bailed out
of the reconciliation on the first failure. With this patch we attempt
all of the RPCs.
This allows the observed state to update even if another RPC failed for
unrelated reasons.

2. If the overall reconciliation failed, we don't want to remove nodes
from the
observed state as a safe-guard. With the previous patch, we'll get a
deletion delta to process, which would be ignored. Ignoring it is not
the right thing to do since it's out of sync with the db state.
Hence, on reconciliation failures map deletion from the observed state
to the uncertain state. Future reconciliation will query the node to
refresh their observed state.

Closes LKB-204
2025-07-25 14:03:17 +00:00
Erik Grinaker
185ead8395 pageserver: verify gRPC GetPages on correct shard (#12722)
Verify that gRPC `GetPageRequest` has been sent to the shard that owns
the pages. This avoid spurious `NotFound` errors if a compute misroutes
a request, which can appear scarier (e.g. data loss).

Touches [LKB-191](https://databricks.atlassian.net/browse/LKB-191).
2025-07-25 13:43:04 +00:00
Erik Grinaker
37e322438b pageserver: document gRPC compute accessibility (#12724)
Document that the Pageserver gRPC port is accessible by computes, and
should not provide internal services.

Touches [LKB-191](https://databricks.atlassian.net/browse/LKB-191).
2025-07-25 13:35:44 +00:00
Gustavo Bazan
fca2c32e59 [ci/docker] task: Apply some quick wins for tools dockerfile (#12740)
## Problem

The Dockerfile for build tools has some small issues that are easy to
fix to make it follow some of docker best practices

## Summary of changes

Apply some small quick wins on the Dockerfile for build tools

- Usage of apt-get over apt
- usage of --no-cache-dir for pip install
2025-07-25 12:39:01 +00:00
Conrad Ludgate
d19aebcf12 [proxy] introduce moka for the project-info cache (#12710)
## Problem

LKB-2502 The garbage collection of the project info cache is garbage. 

What we observed: If we get unlucky, we might throw away a very hot
entry if the cache is full. The GC loop is dependent on getting a lucky
shard of the projects2ep table that clears a lot of cold entries. The GC
does not take into account active use, and the interval it runs at is
too sparse to do any good.

Can we switch to a proper cache implementation?

Complications:
1. We need to invalidate by project/account.
2. We need to expire based on `retry_delay_ms`.

## Summary of changes

1. Replace `retry_delay_ms: Duration` with `retry_at: Instant` when
deserializing.
2. Split the EndpointControls from the RoleControls into two different
caches.
3. Introduce an expiry policy based on error retry info.
4. Introduce `moka` as a dependency, replacing our `TimedLru`.

See the follow up PR for changing all TimedLru instances to use moka:
#12726.
2025-07-25 11:40:47 +00:00
Conrad Ludgate
a70a5bccff move subzero_core to proxy libs (#12742)
We have a dedicated libs folder for proxy related libraries. Let's move
the subzero_core stub there.
2025-07-25 10:44:28 +00:00
Conrad Ludgate
d9cedb4a95 [tokio-postgres] fix regression in buffer reuse (#12739)
Follow up to #12701, which introduced a new regression. When profiling
locally I noticed that writes have the tendency to always reallocate. On
investigation I found that even if the `Connection`'s write buffer is
empty, if it still shares the same data pointer as the `Client`'s write
buffer then the client cannot reclaim it.

The best way I found to fix this is to just drop the `Connection`'s
write buffer each time we fully flush it.

Additionally, I remembered that `BytesMut` has an `unsplit` method which
is allows even better sharing over the previous optimisation I had when
'encoding'.
2025-07-25 09:03:21 +00:00
58 changed files with 1458 additions and 889 deletions

View File

@@ -27,11 +27,14 @@ config-variables:
- HETZNER_CACHE_BUCKET
- HETZNER_CACHE_ENDPOINT
- HETZNER_CACHE_REGION
- LAKEBASE_API_HOST
- LAKEBASE_OAUTH_CLIENT_ID
- LAKEBASE_ORG_ID
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID
- PGREGRESS_PG17_PROJECT_ID
- PREWARM_PGBENCH_SIZE
- PREWARM_PROJECT_ID
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_CICD_CHANNEL_ID

View File

@@ -0,0 +1,126 @@
name: 'Create Lakebase Project'
description: 'Create Lakebase Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/lakebase-console'
required: true
postgres_version:
description: 'Postgres version; default is 16'
default: '17'
compute_units:
description: '[Min, Max] compute units'
default: '[1, 1]'
psql_path:
description: 'Path to psql binary - it is caller responsibility to provision the psql binary'
required: false
default: '/tmp/neon/pg_install/v16/bin/psql'
libpq_lib_path:
description: 'Path to directory containing libpq library - it is caller responsibility to provision the libpq library'
required: false
default: '/tmp/neon/pg_install/v16/lib'
project_settings:
description: 'A JSON object with project settings'
required: false
default: '{}'
fixed_hostname:
description: 'Fixed hostname to use for connection URI'
required: false
default: 'k8s-dpingres-serverle-09ade1e9e9-0d7f675c53b35938.elb.us-west-2.amazonaws.com'
region_id:
description: 'Project region ID'
required: false
default: 'aws-us-east-2'
outputs:
dsn:
description: 'Created Project DSN (for main database)'
value: ${{ steps.create-neon-project.outputs.dsn }}
project_id:
description: 'Created Project ID'
value: ${{ steps.create-neon-project.outputs.project_id }}
runs:
using: "composite"
steps:
- name: Create Lakebase Project
id: create-lakebase-project
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
res=$(curl \
"https://${API_HOST}/api/v2/projects" \
-w "%{http_code}" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}" \
--data "{
\"project\": {
\"org_id\": \"${ORG_ID}\",
\"name\": \"Created by actions/lakebase-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"pg_version\": ${POSTGRES_VERSION},
\"region_id\": \"${REGION_ID}\",
\"provisioner\": \"k8s-neonvm\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": ${PROJECT_SETTINGS}
}
}")
code=${res: -3}
if [[ ${code} -ge 400 ]]; then
echo Request failed with error code ${code}
echo ${res::-3}
exit 1
else
project=${res::-3}
fi
# Mask password
echo "::add-mask::$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .password')"
original_dsn=$(echo $project | jq --raw-output '.connection_uris[0].connection_uri')
echo "::add-mask::${original_dsn}"
# Extract endpoint ID from the original hostname
endpoint_id=$(echo "$original_dsn" | sed -n 's/.*@\(ep-[^.]*\)\..*/\1/p')
# Parse original URI components
user_pass=$(echo "$original_dsn" | sed -n 's/postgresql:\/\/\([^@]*\)@.*/\1/p')
database=$(echo "$original_dsn" | sed -n 's/.*\/\([^?]*\).*/\1/p')
# Construct the corrected DSN with fixed hostname and endpoint in options
if [[ "$original_dsn" == *"?"* ]]; then
# Extract existing query parameters
existing_params=$(echo "$original_dsn" | sed -n 's/.*?\(.*\)/\1/p')
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?${existing_params}&options=endpoint%3d${endpoint_id}"
else
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?options=endpoint%3d${endpoint_id}"
fi
echo "::add-mask::${dsn}"
echo "dsn=${dsn}" >> $GITHUB_OUTPUT
project_id=$(echo $project | jq --raw-output '.project.id')
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
echo "Project ${project_id} has been created"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
REGION_ID: ${{ inputs.region_id }}
POSTGRES_VERSION: ${{ inputs.postgres_version }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
PSQL: ${{ inputs.psql_path }}
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}
PROJECT_SETTINGS: ${{ inputs.project_settings }}
FIXED_HOSTNAME: ${{ inputs.fixed_hostname }}

View File

@@ -0,0 +1,39 @@
name: 'Delete Neon Project'
description: 'Delete Neon Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/ajax-api/2.0/lakebase-console'
required: true
project_id:
description: 'ID of the Project to delete'
required: true
runs:
using: "composite"
steps:
- name: Delete Lakebase Project
# Do not try to delete a project if .github/actions/neon-project-create failed before
if: ${{ inputs.project_id != '' }}
shell: bash -euxo pipefail {0}
run: |
curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}" \
--fail \
--request DELETE \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}"
echo "Project ${PROJECT_ID} has been deleted"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
PROJECT_ID: ${{ inputs.project_id }}

View File

@@ -418,7 +418,7 @@ jobs:
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PGBENCH_SIZE: ${{ vars.PREWARM_PGBENCH_SIZE }}
PROJECT_ID: ${{ vars.PREWARM_PROJECT_ID }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output

View File

@@ -0,0 +1,166 @@
name: Lakebase Benchmarking
on:
# uncomment to run on push for debugging your PR
push:
branches: [ bodobolero/lakebase_perf_tests ]
workflow_dispatch: # adds ability to run this manually
inputs:
postgres_version:
description: 'Postgres version'
required: false
default: '17'
save_perf_report:
type: boolean
description: 'Publish perf report'
required: false
default: false
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
lakebase-pgbench:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: ${{ github.event.inputs.postgres_version || '17' }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || false }}
PLATFORM: "lakebase-captest-new"
# TODO: for lakehouse test-shard which is probably deployed in US-West we need to change the runner
# to us-west to get correct OLTP latencies due to added speed of light latency
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
# Increase timeout to 8h, default timeout is 6h
timeout-minutes: 480
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
## TODO, currently we cannot really specify a small min max CU for lakebase project
## and the semantic of a CU is different from Neon, so we need to carefully map
## compute sizes before comparing results
- name: Create Lakebase Project
id: create-lakebase-project
uses: ./.github/actions/lakebase-project-create
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
postgres_version: ${{ env.PG_VERSION }}
compute_units: '[1, 1]'
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark simple-update
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark select-only
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Delete Lakebase Project
if: ${{ steps.create-lakebase-project.outputs.project_id && always() }}
uses: ./.github/actions/lakebase-project-delete
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
project_id: ${{ steps.create-lakebase-project.outputs.project_id }}
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Lakebase perf testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

200
Cargo.lock generated
View File

@@ -211,11 +211,11 @@ dependencies = [
[[package]]
name = "async-lock"
version = "3.2.0"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c"
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
dependencies = [
"event-listener 4.0.0",
"event-listener 5.4.0",
"event-listener-strategy",
"pin-project-lite",
]
@@ -1404,9 +1404,9 @@ dependencies = [
[[package]]
name = "concurrent-queue"
version = "2.3.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
@@ -2232,9 +2232,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "event-listener"
version = "4.0.0"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "770d968249b5d99410d61f5bf89057f3199a077a04d087092f58e7d10692baae"
checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae"
dependencies = [
"concurrent-queue",
"parking",
@@ -2243,11 +2243,11 @@ dependencies = [
[[package]]
name = "event-listener-strategy"
version = "0.4.0"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
dependencies = [
"event-listener 4.0.0",
"event-listener 5.4.0",
"pin-project-lite",
]
@@ -2516,6 +2516,20 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "304de19db7028420975a296ab0fcbbc8e69438c4ed254a1e41e2a7f37d5f0e0a"
[[package]]
name = "generator"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827"
dependencies = [
"cc",
"cfg-if",
"libc",
"log",
"rustversion",
"windows 0.61.3",
]
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -2834,7 +2848,7 @@ checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
dependencies = [
"cfg-if",
"libc",
"windows",
"windows 0.52.0",
]
[[package]]
@@ -3105,7 +3119,7 @@ dependencies = [
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
"windows-core 0.52.0",
]
[[package]]
@@ -3656,6 +3670,19 @@ version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
[[package]]
name = "loom"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca"
dependencies = [
"cfg-if",
"generator",
"scoped-tls",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "lru"
version = "0.12.3"
@@ -3872,6 +3899,25 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "moka"
version = "0.12.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926"
dependencies = [
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"loom",
"parking_lot 0.12.1",
"portable-atomic",
"rustc_version",
"smallvec",
"tagptr",
"thiserror 1.0.69",
"uuid",
]
[[package]]
name = "multimap"
version = "0.8.3"
@@ -5385,7 +5431,6 @@ dependencies = [
"futures",
"gettid",
"hashbrown 0.14.5",
"hashlink",
"hex",
"hmac",
"hostname",
@@ -5407,6 +5452,7 @@ dependencies = [
"lasso",
"measured",
"metrics",
"moka",
"once_cell",
"opentelemetry",
"ouroboros",
@@ -6420,6 +6466,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -7269,6 +7321,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tar"
version = "0.4.40"
@@ -8638,10 +8696,32 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core",
"windows-core 0.52.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows"
version = "0.61.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893"
dependencies = [
"windows-collections",
"windows-core 0.61.2",
"windows-future",
"windows-link",
"windows-numerics",
]
[[package]]
name = "windows-collections"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8"
dependencies = [
"windows-core 0.61.2",
]
[[package]]
name = "windows-core"
version = "0.52.0"
@@ -8651,6 +8731,86 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-future"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
dependencies = [
"windows-core 0.61.2",
"windows-link",
"windows-threading",
]
[[package]]
name = "windows-implement"
version = "0.60.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-interface"
version = "0.59.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-link"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
[[package]]
name = "windows-numerics"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1"
dependencies = [
"windows-core 0.61.2",
"windows-link",
]
[[package]]
name = "windows-result"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
@@ -8709,6 +8869,15 @@ dependencies = [
"windows_x86_64_msvc 0.52.6",
]
[[package]]
name = "windows-threading"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6"
dependencies = [
"windows-link",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
@@ -8845,6 +9014,8 @@ dependencies = [
"clap",
"clap_builder",
"const-oid",
"crossbeam-epoch",
"crossbeam-utils",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",
@@ -8890,6 +9061,7 @@ dependencies = [
"once_cell",
"p256 0.13.2",
"parquet",
"portable-atomic",
"prettyplease",
"proc-macro2",
"prost 0.13.5",

View File

@@ -46,10 +46,10 @@ members = [
"libs/proxy/json",
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/subzero_core",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"pgxn/neon/communicator",
"proxy/subzero_core",
]
[workspace.package]
@@ -136,6 +136,7 @@ md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
memoffset = "0.9"
moka = { version = "0.12", features = ["sync"] }
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
# Do not update to >= 7.0.0, at least. The update will have a significant impact
# on compute startup metrics (start_postgres_ms), >= 25% degradation.

View File

@@ -39,13 +39,13 @@ COPY build-tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
apt update && \
apt install -y --no-install-recommends \
apt-get update && \
apt-get install -y --no-install-recommends \
ca-certificates wget gpg && \
wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor -o /usr/share/keyrings/postgresql-keyring.gpg && \
echo "deb [signed-by=/usr/share/keyrings/postgresql-keyring.gpg] http://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
apt-get update && \
apt install -y --no-install-recommends \
apt-get install -y --no-install-recommends \
build-essential \
autotools-dev \
libedit-dev \
@@ -89,8 +89,7 @@ RUN useradd -ms /bin/bash nonroot -b /home
# Use strict mode for bash to catch errors early
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
RUN mkdir -p /pgcopydb/bin && \
mkdir -p /pgcopydb/lib && \
RUN mkdir -p /pgcopydb/{bin,lib} && \
chmod -R 755 /pgcopydb && \
chown -R nonroot:nonroot /pgcopydb
@@ -106,8 +105,8 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
# 'gdb' is included so that we get backtraces of core dumps produced in
# regression tests
RUN set -e \
&& apt update \
&& apt install -y \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
autoconf \
automake \
bison \
@@ -183,22 +182,22 @@ RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/
ENV LLVM_VERSION=20
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION}/ llvm-toolchain-${DEBIAN_VERSION}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt update \
&& apt install -y clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& apt-get update \
&& apt-get install -y --no-install-recommends clang-${LLVM_VERSION} llvm-${LLVM_VERSION} \
&& bash -c 'for f in /usr/bin/clang*-${LLVM_VERSION} /usr/bin/llvm*-${LLVM_VERSION}; do ln -s "${f}" "${f%-${LLVM_VERSION}}"; done' \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install node
ENV NODE_VERSION=24
RUN curl -fsSL https://deb.nodesource.com/setup_${NODE_VERSION}.x | bash - \
&& apt install -y nodejs \
&& apt-get install -y --no-install-recommends nodejs \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Install docker
RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian ${DEBIAN_VERSION} stable" > /etc/apt/sources.list.d/docker.list \
&& apt update \
&& apt install -y docker-ce docker-ce-cli \
&& apt-get update \
&& apt-get install -y --no-install-recommends docker-ce docker-ce-cli \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Configure sudo & docker
@@ -215,12 +214,11 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
# Mold: A Modern Linker
ENV MOLD_VERSION=v2.37.1
RUN set -e \
&& git clone https://github.com/rui314/mold.git \
&& git clone -b "${MOLD_VERSION}" --depth 1 https://github.com/rui314/mold.git \
&& mkdir mold/build \
&& cd mold/build \
&& git checkout ${MOLD_VERSION} \
&& cd mold/build \
&& cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang++ .. \
&& cmake --build . -j $(nproc) \
&& cmake --build . -j "$(nproc)" \
&& cmake --install . \
&& cd .. \
&& rm -rf mold
@@ -254,7 +252,7 @@ ENV ICU_VERSION=67.1
ENV ICU_PREFIX=/usr/local/icu
# Download and build static ICU
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
RUN wget -O "/tmp/libicu-${ICU_VERSION}.tgz" https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
echo "94a80cd6f251a53bd2a997f6f1b5ac6653fe791dfab66e1eb0227740fb86d5dc /tmp/libicu-${ICU_VERSION}.tgz" | sha256sum --check && \
mkdir /tmp/icu && \
pushd /tmp/icu && \
@@ -265,8 +263,7 @@ RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/re
make install && \
popd && \
rm -rf icu && \
rm -f /tmp/libicu-${ICU_VERSION}.tgz && \
popd
rm -f /tmp/libicu-${ICU_VERSION}.tgz
# Switch to nonroot user
USER nonroot:nonroot
@@ -279,19 +276,19 @@ ENV PYTHON_VERSION=3.11.12 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
&& cd $HOME \
&& cd "$HOME" \
&& curl -sSO https://raw.githubusercontent.com/pyenv/pyenv-installer/master/bin/pyenv-installer \
&& chmod +x pyenv-installer \
&& ./pyenv-installer \
&& export PYENV_ROOT=/home/nonroot/.pyenv \
&& export PATH="$PYENV_ROOT/bin:$PATH" \
&& export PATH="$PYENV_ROOT/shims:$PATH" \
&& pyenv install ${PYTHON_VERSION} \
&& pyenv global ${PYTHON_VERSION} \
&& pyenv install "${PYTHON_VERSION}" \
&& pyenv global "${PYTHON_VERSION}" \
&& python --version \
&& pip install --upgrade pip \
&& pip install --no-cache-dir --upgrade pip \
&& pip --version \
&& pip install pipenv wheel poetry
&& pip install --no-cache-dir pipenv wheel poetry
# Switch to nonroot user (again)
USER nonroot:nonroot
@@ -317,13 +314,13 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
. "$HOME/.cargo/env" && \
cargo --version && rustup --version && \
rustup component add llvm-tools rustfmt clippy && \
cargo install rustfilt --locked --version ${RUSTFILT_VERSION} && \
cargo install cargo-hakari --locked --version ${CARGO_HAKARI_VERSION} && \
cargo install cargo-deny --locked --version ${CARGO_DENY_VERSION} && \
cargo install cargo-hack --locked --version ${CARGO_HACK_VERSION} && \
cargo install cargo-nextest --locked --version ${CARGO_NEXTEST_VERSION} && \
cargo install cargo-chef --locked --version ${CARGO_CHEF_VERSION} && \
cargo install diesel_cli --locked --version ${CARGO_DIESEL_CLI_VERSION} \
cargo install rustfilt --locked --version "${RUSTFILT_VERSION}" && \
cargo install cargo-hakari --locked --version "${CARGO_HAKARI_VERSION}" && \
cargo install cargo-deny --locked --version "${CARGO_DENY_VERSION}" && \
cargo install cargo-hack --locked --version "${CARGO_HACK_VERSION}" && \
cargo install cargo-nextest --locked --version "${CARGO_NEXTEST_VERSION}" && \
cargo install cargo-chef --locked --version "${CARGO_CHEF_VERSION}" && \
cargo install diesel_cli --locked --version "${CARGO_DIESEL_CLI_VERSION}" \
--features postgres-bundled --no-default-features && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git

View File

@@ -413,6 +413,52 @@ struct StartVmMonitorResult {
vm_monitor: Option<JoinHandle<Result<()>>>,
}
/// Databricks-specific environment variables to be passed to the `postgres` sub-process.
pub struct DatabricksEnvVars {
/// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check
/// the token scopes of internal auth tokens.
pub endpoint_id: String,
/// Hostname of the Databricks workspace URL this compute instance belongs to.
/// Used by postgres to verify Databricks PAT tokens.
pub workspace_host: String,
}
impl DatabricksEnvVars {
pub fn new(compute_spec: &ComputeSpec, compute_id: Option<&String>) -> Self {
// compute_id is a string format of "{endpoint_id}/{compute_idx}"
// endpoint_id is a uuid. We only need to pass down endpoint_id to postgres.
// Panics if compute_id is not set or not in the expected format.
let endpoint_id = compute_id.unwrap().split('/').next().unwrap().to_string();
let workspace_host = compute_spec
.databricks_settings
.as_ref()
.map(|s| s.databricks_workspace_host.clone())
.unwrap_or("".to_string());
Self {
endpoint_id,
workspace_host,
}
}
/// Constants for the names of Databricks-specific postgres environment variables.
const DATABRICKS_ENDPOINT_ID_ENVVAR: &'static str = "DATABRICKS_ENDPOINT_ID";
const DATABRICKS_WORKSPACE_HOST_ENVVAR: &'static str = "DATABRICKS_WORKSPACE_HOST";
/// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`.
pub fn to_env_var_list(self) -> Vec<(String, String)> {
vec![
(
Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(),
self.endpoint_id.clone(),
),
(
Self::DATABRICKS_WORKSPACE_HOST_ENVVAR.to_string(),
self.workspace_host.clone(),
),
]
}
}
impl ComputeNode {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
@@ -1411,6 +1457,8 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.params.pgdata);
let tls_config = self.tls_config(&pspec.spec);
let databricks_settings = spec.databricks_settings.as_ref();
let postgres_port = self.params.connstr.port();
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
@@ -1418,8 +1466,11 @@ impl ComputeNode {
pgdata_path,
&self.params,
&pspec.spec,
postgres_port,
self.params.internal_http_port,
tls_config,
databricks_settings,
self.params.lakebase_mode,
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -1459,8 +1510,28 @@ impl ComputeNode {
)
})?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path, None)?;
if let Some(settings) = databricks_settings {
copy_tls_certificates(
&settings.pg_compute_tls_settings.key_file,
&settings.pg_compute_tls_settings.cert_file,
pgdata_path,
)?;
// Update pg_hba.conf received with basebackup including additional databricks settings.
update_pg_hba(pgdata_path, Some(&settings.databricks_pg_hba))?;
update_pg_ident(pgdata_path, Some(&settings.databricks_pg_ident))?;
} else {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path, None)?;
}
if let Some(databricks_settings) = spec.databricks_settings.as_ref() {
copy_tls_certificates(
&databricks_settings.pg_compute_tls_settings.key_file,
&databricks_settings.pg_compute_tls_settings.cert_file,
pgdata_path,
)?;
}
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
@@ -1573,14 +1644,31 @@ impl ComputeNode {
pub fn start_postgres(&self, storage_auth_token: Option<String>) -> Result<PostgresHandle> {
let pgdata_path = Path::new(&self.params.pgdata);
let env_vars: Vec<(String, String)> = if self.params.lakebase_mode {
let databricks_env_vars = {
let state = self.state.lock().unwrap();
let spec = &state.pspec.as_ref().unwrap().spec;
DatabricksEnvVars::new(spec, Some(&self.params.compute_id))
};
info!(
"Starting Postgres for databricks endpoint id: {}",
&databricks_env_vars.endpoint_id
);
let mut env_vars = databricks_env_vars.to_env_var_list();
env_vars.extend(storage_auth_token.map(|t| ("NEON_AUTH_TOKEN".to_string(), t)));
env_vars
} else if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN".to_owned(), storage_auth_token.to_owned())]
} else {
vec![]
};
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.params.pgbin)
.args(["-D", &self.params.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
})
.envs(env_vars)
.stderr(Stdio::piped())
.spawn()
.expect("cannot start postgres process");
@@ -1883,12 +1971,16 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
let postgres_port = self.params.connstr.port();
config::write_postgres_conf(
pgdata_path,
&self.params,
&spec,
postgres_port,
self.params.internal_http_port,
tls_config,
spec.databricks_settings.as_ref(),
self.params.lakebase_mode,
)?;
self.pg_reload_conf()?;

View File

@@ -7,11 +7,14 @@ use std::io::prelude::*;
use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use compute_api::spec::{
ComputeAudit, ComputeMode, ComputeSpec, DatabricksSettings, GenericOption,
};
use crate::compute::ComputeNodeParams;
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
DatabricksSettingsExt as _, GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize,
escape_conf_value,
};
use crate::tls::{self, SERVER_CRT, SERVER_KEY};
@@ -40,12 +43,16 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
}
/// Create or completely rewrite configuration file specified by `path`
#[allow(clippy::too_many_arguments)]
pub fn write_postgres_conf(
pgdata_path: &Path,
params: &ComputeNodeParams,
spec: &ComputeSpec,
postgres_port: Option<u16>,
extension_server_port: u16,
tls_config: &Option<TlsConfig>,
databricks_settings: Option<&DatabricksSettings>,
lakebase_mode: bool,
) -> Result<()> {
let path = pgdata_path.join("postgresql.conf");
// File::create() destroys the file content if it exists.
@@ -285,6 +292,24 @@ pub fn write_postgres_conf(
writeln!(file, "log_destination='stderr,syslog'")?;
}
if lakebase_mode {
// Explicitly set the port based on the connstr, overriding any previous port setting.
// Note: It is important that we don't specify a different port again after this.
let port = postgres_port.expect("port must be present in connstr");
writeln!(file, "port = {port}")?;
// This is databricks specific settings.
// This should be at the end of the file but before `compute_ctl_temp_override.conf` below
// so that it can override any settings above.
// `compute_ctl_temp_override.conf` is intended to override any settings above during specific operations.
// To prevent potential breakage in the future, we keep it above `compute_ctl_temp_override.conf`.
writeln!(file, "# Databricks settings start")?;
if let Some(settings) = databricks_settings {
writeln!(file, "{}", settings.as_pg_settings())?;
}
writeln!(file, "# Databricks settings end")?;
}
// This is essential to keep this line at the end of the file,
// because it is intended to override any settings above.
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;

View File

@@ -142,7 +142,7 @@ pub fn update_pg_hba(pgdata_path: &Path, databricks_pg_hba: Option<&String>) ->
// Update pg_hba to contains databricks specfic settings before adding neon settings
// PG uses the first record that matches to perform authentication, so we need to have
// our rules before the default ones from neon.
// See https://www.postgresql.org/docs/16/auth-pg-hba-conf.html
// See https://www.postgresql.org/docs/current/auth-pg-hba-conf.html
if let Some(databricks_pg_hba) = databricks_pg_hba {
if config::line_in_file(
&pghba_path,

View File

@@ -793,6 +793,7 @@ impl Endpoint {
autoprewarm: args.autoprewarm,
offload_lfc_interval_seconds: args.offload_lfc_interval_seconds,
suspend_timeout_seconds: -1, // Only used in neon_local.
databricks_settings: None,
};
// this strange code is needed to support respec() in tests

View File

@@ -193,6 +193,9 @@ pub struct ComputeSpec {
///
/// We use this value to derive other values, such as the installed extensions metric.
pub suspend_timeout_seconds: i64,
// Databricks specific options for compute instance.
pub databricks_settings: Option<DatabricksSettings>,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.

View File

@@ -129,6 +129,12 @@ impl<L: LabelGroup> InfoMetric<L> {
}
}
impl<L: LabelGroup + Default> Default for InfoMetric<L, GaugeState> {
fn default() -> Self {
InfoMetric::new(L::default())
}
}
impl<L: LabelGroup, M: MetricType<Metadata = ()>> InfoMetric<L, M> {
pub fn with_metric(label: L, metric: M) -> Self {
Self {

View File

@@ -185,6 +185,7 @@ impl Client {
ssl_mode: SslMode,
process_id: i32,
secret_key: i32,
write_buf: BytesMut,
) -> Client {
Client {
inner: InnerClient {
@@ -195,7 +196,7 @@ impl Client {
waiting: 0,
received: 0,
},
buffer: Default::default(),
buffer: write_buf,
},
cached_typeinfo: Default::default(),

View File

@@ -47,14 +47,7 @@ impl Encoder<BytesMut> for PostgresCodec {
type Error = io::Error;
fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> io::Result<()> {
// When it comes to request/response workflows, we usually flush the entire write
// buffer in order to wait for the response before we send a new request.
// Therefore we can avoid the copy and just replace the buffer.
if dst.is_empty() {
*dst = item;
} else {
dst.extend_from_slice(&item);
}
dst.unsplit(item);
Ok(())
}
}

View File

@@ -77,6 +77,9 @@ where
connect_timeout,
};
let mut stream = stream.into_framed();
let write_buf = std::mem::take(stream.write_buffer_mut());
let (client_tx, conn_rx) = mpsc::unbounded_channel();
let (conn_tx, client_rx) = mpsc::channel(4);
let client = Client::new(
@@ -86,9 +89,9 @@ where
ssl_mode,
process_id,
secret_key,
write_buf,
);
let stream = stream.into_framed();
let connection = Connection::new(stream, conn_tx, conn_rx);
Ok((client, connection))

View File

@@ -229,8 +229,11 @@ where
Poll::Ready(()) => {
trace!("poll_flush: flushed");
// GC the write buffer if we managed to flush
gc_bytesmut(self.stream.write_buffer_mut());
// Since our codec prefers to share the buffer with the `Client`,
// if we don't release our share, then the `Client` would have to re-alloc
// the buffer when they next use it.
debug_assert!(self.stream.write_buffer().is_empty());
*self.stream.write_buffer_mut() = BytesMut::new();
Poll::Ready(Ok(()))
}

View File

@@ -715,7 +715,7 @@ fn start_pageserver(
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
feature_resolver,
feature_resolver.clone(),
)
.context("Failed to initialize router state")?,
);
@@ -841,14 +841,14 @@ fn start_pageserver(
} else {
None
},
feature_resolver.clone(),
);
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
// each stream/request.
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for each request/stream.
// It uses a separate compute request Tokio runtime (COMPUTE_REQUEST_RUNTIME).
//
// TODO: this uses a separate Tokio runtime for the page service. If we want
// other gRPC services, they will need their own port and runtime. Is this
// necessary?
// NB: this port is exposed to computes. It should only provide services that we're okay with
// computes accessing. Internal services should use a separate port.
let mut page_service_grpc = None;
if let Some(grpc_listener) = grpc_listener {
page_service_grpc = Some(GrpcPageServiceHandler::spawn(

View File

@@ -2005,6 +2005,10 @@ async fn put_tenant_location_config_handler(
let state = get_state(&request);
let conf = state.conf;
fail::fail_point!("put-location-conf-handler", |_| {
Err(ApiError::ResourceUnavailable("failpoint".into()))
});
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {

View File

@@ -68,6 +68,7 @@ use crate::config::PageServerConf;
use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::feature_resolver::FeatureResolver;
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
@@ -139,6 +140,7 @@ pub fn spawn(
perf_trace_dispatch: Option<Dispatch>,
tcp_listener: tokio::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
feature_resolver: FeatureResolver,
) -> Listener {
let cancel = CancellationToken::new();
let libpq_ctx = RequestContext::todo_child(
@@ -160,6 +162,7 @@ pub fn spawn(
conf.pg_auth_type,
tls_config,
conf.page_service_pipelining.clone(),
feature_resolver,
libpq_ctx,
cancel.clone(),
)
@@ -218,6 +221,7 @@ pub async fn libpq_listener_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
feature_resolver: FeatureResolver,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
) -> Connections {
@@ -261,6 +265,7 @@ pub async fn libpq_listener_main(
auth_type,
tls_config.clone(),
pipelining_config.clone(),
feature_resolver.clone(),
connection_ctx,
connections_cancel.child_token(),
gate_guard,
@@ -303,6 +308,7 @@ async fn page_service_conn_main(
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
pipelining_config: PageServicePipeliningConfig,
feature_resolver: FeatureResolver,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
@@ -370,6 +376,7 @@ async fn page_service_conn_main(
perf_span_fields,
connection_ctx,
cancel.clone(),
feature_resolver.clone(),
gate_guard,
);
let pgbackend =
@@ -421,6 +428,8 @@ struct PageServerHandler {
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
feature_resolver: FeatureResolver,
gate_guard: GateGuard,
}
@@ -587,6 +596,15 @@ impl timeline::handle::TenantManager<TenantManagerTypes> for TenantManagerWrappe
}
}
/// Whether to hold the applied GC cutoff guard when processing GetPage requests.
/// This is determined once at the start of pagestream subprotocol handling based on
/// feature flags, configuration, and test conditions.
#[derive(Debug, Clone, Copy)]
enum HoldAppliedGcCutoffGuard {
Yes,
No,
}
#[derive(thiserror::Error, Debug)]
enum PageStreamError {
/// We encountered an error that should prompt the client to reconnect:
@@ -730,6 +748,7 @@ enum BatchedFeMessage {
GetPage {
span: Span,
shard: WeakHandle<TenantManagerTypes>,
applied_gc_cutoff_guard: Option<RcuReadGuard<Lsn>>,
pages: SmallVec<[BatchedGetPageRequest; 1]>,
batch_break_reason: GetPageBatchBreakReason,
},
@@ -909,6 +928,7 @@ impl PageServerHandler {
perf_span_fields: ConnectionPerfSpanFields,
connection_ctx: RequestContext,
cancel: CancellationToken,
feature_resolver: FeatureResolver,
gate_guard: GateGuard,
) -> Self {
PageServerHandler {
@@ -920,6 +940,7 @@ impl PageServerHandler {
cancel,
pipelining_config,
get_vectored_concurrent_io,
feature_resolver,
gate_guard,
}
}
@@ -959,6 +980,7 @@ impl PageServerHandler {
ctx: &RequestContext,
protocol_version: PagestreamProtocolVersion,
parent_span: Span,
hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
) -> Result<Option<BatchedFeMessage>, QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
@@ -1196,19 +1218,27 @@ impl PageServerHandler {
})
.await?;
let applied_gc_cutoff_guard = shard.get_applied_gc_cutoff_lsn(); // hold guard
// We're holding the Handle
let effective_lsn = match Self::effective_request_lsn(
&shard,
shard.get_last_record_lsn(),
req.hdr.request_lsn,
req.hdr.not_modified_since,
&shard.get_applied_gc_cutoff_lsn(),
&applied_gc_cutoff_guard,
) {
Ok(lsn) => lsn,
Err(e) => {
return respond_error!(span, e);
}
};
let applied_gc_cutoff_guard = match hold_gc_cutoff_guard {
HoldAppliedGcCutoffGuard::Yes => Some(applied_gc_cutoff_guard),
HoldAppliedGcCutoffGuard::No => {
drop(applied_gc_cutoff_guard);
None
}
};
let batch_wait_ctx = if ctx.has_perf_span() {
Some(
@@ -1229,6 +1259,7 @@ impl PageServerHandler {
BatchedFeMessage::GetPage {
span,
shard: shard.downgrade(),
applied_gc_cutoff_guard,
pages: smallvec![BatchedGetPageRequest {
req,
timer,
@@ -1329,13 +1360,28 @@ impl PageServerHandler {
match (eligible_batch, this_msg) {
(
BatchedFeMessage::GetPage {
pages: accum_pages, ..
pages: accum_pages,
applied_gc_cutoff_guard: accum_applied_gc_cutoff_guard,
..
},
BatchedFeMessage::GetPage {
pages: this_pages, ..
pages: this_pages,
applied_gc_cutoff_guard: this_applied_gc_cutoff_guard,
..
},
) => {
accum_pages.extend(this_pages);
// the minimum of the two guards will keep data for both alive
match (&accum_applied_gc_cutoff_guard, this_applied_gc_cutoff_guard) {
(None, None) => (),
(None, Some(this)) => *accum_applied_gc_cutoff_guard = Some(this),
(Some(_), None) => (),
(Some(accum), Some(this)) => {
if **accum > *this {
*accum_applied_gc_cutoff_guard = Some(this);
}
}
};
Ok(())
}
#[cfg(feature = "testing")]
@@ -1650,6 +1696,7 @@ impl PageServerHandler {
BatchedFeMessage::GetPage {
span,
shard,
applied_gc_cutoff_guard,
pages,
batch_break_reason,
} => {
@@ -1669,6 +1716,7 @@ impl PageServerHandler {
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
drop(applied_gc_cutoff_guard);
res
},
span,
@@ -1750,7 +1798,7 @@ impl PageServerHandler {
/// Coding discipline within this function: all interaction with the `pgb` connection
/// needs to be sensitive to connection shutdown, currently signalled via [`Self::cancel`].
/// This is so that we can shutdown page_service quickly.
#[instrument(skip_all)]
#[instrument(skip_all, fields(hold_gc_cutoff_guard))]
async fn handle_pagerequests<IO>(
&mut self,
pgb: &mut PostgresBackend<IO>,
@@ -1796,6 +1844,30 @@ impl PageServerHandler {
.take()
.expect("implementation error: timeline_handles should not be locked");
// Evaluate the expensive feature resolver check once per pagestream subprotocol handling
// instead of once per GetPage request. This is shared between pipelined and serial paths.
let hold_gc_cutoff_guard = if cfg!(test) || cfg!(feature = "testing") {
HoldAppliedGcCutoffGuard::Yes
} else {
// Use the global feature resolver with the tenant ID directly, avoiding the need
// to get a timeline/shard which might not be available on this pageserver node.
let empty_properties = std::collections::HashMap::new();
match self.feature_resolver.evaluate_boolean(
"page-service-getpage-hold-applied-gc-cutoff-guard",
tenant_id,
&empty_properties,
) {
Ok(()) => HoldAppliedGcCutoffGuard::Yes,
Err(_) => HoldAppliedGcCutoffGuard::No,
}
};
// record it in the span of handle_pagerequests so that both the request_span
// and the pipeline implementation spans contains the field.
Span::current().record(
"hold_gc_cutoff_guard",
tracing::field::debug(&hold_gc_cutoff_guard),
);
let request_span = info_span!("request");
let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
PageServicePipeliningConfig::Pipelined(pipelining_config) => {
@@ -1809,6 +1881,7 @@ impl PageServerHandler {
pipelining_config,
protocol_version,
io_concurrency,
hold_gc_cutoff_guard,
&ctx,
)
.await
@@ -1823,6 +1896,7 @@ impl PageServerHandler {
request_span,
protocol_version,
io_concurrency,
hold_gc_cutoff_guard,
&ctx,
)
.await
@@ -1851,6 +1925,7 @@ impl PageServerHandler {
request_span: Span,
protocol_version: PagestreamProtocolVersion,
io_concurrency: IoConcurrency,
hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
ctx: &RequestContext,
) -> (
(PostgresBackendReader<IO>, TimelineHandles),
@@ -1872,6 +1947,7 @@ impl PageServerHandler {
ctx,
protocol_version,
request_span.clone(),
hold_gc_cutoff_guard,
)
.await;
let msg = match msg {
@@ -1919,6 +1995,7 @@ impl PageServerHandler {
pipelining_config: PageServicePipeliningConfigPipelined,
protocol_version: PagestreamProtocolVersion,
io_concurrency: IoConcurrency,
hold_gc_cutoff_guard: HoldAppliedGcCutoffGuard,
ctx: &RequestContext,
) -> (
(PostgresBackendReader<IO>, TimelineHandles),
@@ -2022,6 +2099,7 @@ impl PageServerHandler {
&ctx,
protocol_version,
request_span.clone(),
hold_gc_cutoff_guard,
)
.await;
let Some(read_res) = read_res.transpose() else {
@@ -2068,6 +2146,7 @@ impl PageServerHandler {
pages,
span: _,
shard: _,
applied_gc_cutoff_guard: _,
batch_break_reason: _,
} = &mut batch
{
@@ -3429,8 +3508,6 @@ impl GrpcPageServiceHandler {
/// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
/// GetPageResponse with an appropriate status code to avoid terminating the stream.
///
/// TODO: verify that the requested pages belong to this shard.
///
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
/// split them up in the client or server.
@@ -3456,6 +3533,19 @@ impl GrpcPageServiceHandler {
lsn = %req.read_lsn,
);
for &blkno in &req.block_numbers {
let shard = timeline.get_shard_identity();
let key = rel_block_to_key(req.rel, blkno);
if !shard.is_key_local(&key) {
return Err(tonic::Status::invalid_argument(format!(
"block {blkno} of relation {} requested on wrong shard {} (is on {})",
req.rel,
timeline.get_shard_index(),
ShardIndex::new(shard.get_shard_number(&key), shard.count),
)));
}
}
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
let effective_lsn = PageServerHandler::effective_request_lsn(
&timeline,

View File

@@ -70,7 +70,7 @@ use tracing::*;
use utils::generation::Generation;
use utils::guard_arc_swap::GuardArcSwap;
use utils::id::TimelineId;
use utils::logging::{MonitorSlowFutureCallback, monitor_slow_future};
use utils::logging::{MonitorSlowFutureCallback, log_slow, monitor_slow_future};
use utils::lsn::{AtomicLsn, Lsn, RecordLsn};
use utils::postgres_client::PostgresClientProtocol;
use utils::rate_limit::RateLimit;
@@ -6898,7 +6898,13 @@ impl Timeline {
write_guard.store_and_unlock(new_gc_cutoff)
};
waitlist.wait().await;
let waitlist_wait_fut = std::pin::pin!(waitlist.wait());
log_slow(
"applied_gc_cutoff waitlist wait",
Duration::from_secs(30),
waitlist_wait_fut,
)
.await;
info!("GC starting");

View File

@@ -33,7 +33,6 @@ env_logger.workspace = true
framed-websockets.workspace = true
futures.workspace = true
hashbrown.workspace = true
hashlink.workspace = true
hex.workspace = true
hmac.workspace = true
hostname.workspace = true
@@ -54,6 +53,7 @@ json = { path = "../libs/proxy/json" }
lasso = { workspace = true, features = ["multi-threaded"] }
measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true
moka.workspace = true
once_cell.workspace = true
opentelemetry = { workspace = true, features = ["trace"] }
papaya = "0.2.0"
@@ -110,7 +110,7 @@ zerocopy.workspace = true
# uncomment this to use the real subzero-core crate
# subzero-core = { git = "https://github.com/neondatabase/subzero", rev = "396264617e78e8be428682f87469bb25429af88a", features = ["postgresql"], optional = true }
# this is a stub for the subzero-core crate
subzero-core = { path = "./subzero_core", features = ["postgresql"], optional = true}
subzero-core = { path = "../libs/proxy/subzero_core", features = ["postgresql"], optional = true}
ouroboros = { version = "0.18", optional = true }
# jwt stuff

View File

@@ -8,11 +8,12 @@ use tracing::{info, info_span};
use crate::auth::backend::ComputeUserInfo;
use crate::cache::Cached;
use crate::cache::node_info::CachedNodeInfo;
use crate::compute::AuthInfo;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::client::cplane_proxy_v1;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::control_plane::{self, NodeInfo};
use crate::error::{ReportableError, UserFacingError};
use crate::pqproto::BeMessage;
use crate::proxy::NeonOptions;

View File

@@ -16,14 +16,14 @@ use tracing::{debug, info};
use crate::auth::{self, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
use crate::cache::Cached;
use crate::cache::node_info::CachedNodeInfo;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::client::ControlPlaneClient;
use crate::control_plane::errors::GetAuthInfoError;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{
self, AccessBlockerFlags, AuthSecret, CachedNodeInfo, ControlPlaneApi, EndpointAccessControl,
RoleAccessControl,
self, AccessBlockerFlags, AuthSecret, ControlPlaneApi, EndpointAccessControl, RoleAccessControl,
};
use crate::intern::EndpointIdInt;
use crate::pqproto::BeMessage;
@@ -433,11 +433,12 @@ mod tests {
use super::auth_quirks;
use super::jwt::JwkCache;
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
use crate::cache::node_info::CachedNodeInfo;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{
self, AccessBlockerFlags, CachedNodeInfo, EndpointAccessControl, RoleAccessControl,
self, AccessBlockerFlags, EndpointAccessControl, RoleAccessControl,
};
use crate::proxy::NeonOptions;
use crate::rate_limiter::EndpointRateLimiter;

View File

@@ -29,7 +29,7 @@ use crate::config::{
};
use crate::control_plane::locks::ApiLocks;
use crate::http::health_server::AppMetrics;
use crate::metrics::{Metrics, ThreadPoolMetrics};
use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics};
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo};
use crate::scram::threadpool::ThreadPool;
use crate::serverless::cancel_set::CancelSet;
@@ -207,6 +207,11 @@ pub async fn run() -> anyhow::Result<()> {
endpoint_rate_limiter,
);
Metrics::get()
.service
.info
.set_label(ServiceInfo::running());
match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await {
// exit immediately on maintenance task completion
Either::Left((Some(res), _)) => match crate::error::flatten_err(res)? {},

View File

@@ -26,7 +26,7 @@ use utils::project_git_version;
use utils::sentry_init::init_sentry;
use crate::context::RequestContext;
use crate::metrics::{Metrics, ThreadPoolMetrics};
use crate::metrics::{Metrics, ServiceInfo, ThreadPoolMetrics};
use crate::pglb::TlsRequired;
use crate::pqproto::FeStartupPacket;
use crate::protocol2::ConnectionInfo;
@@ -135,6 +135,12 @@ pub async fn run() -> anyhow::Result<()> {
cancellation_token.clone(),
))
.map(crate::error::flatten_err);
Metrics::get()
.service
.info
.set_label(ServiceInfo::running());
let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {}));
// the signal task cant ever succeed.

View File

@@ -40,7 +40,7 @@ use crate::config::{
};
use crate::context::parquet::ParquetUploadArgs;
use crate::http::health_server::AppMetrics;
use crate::metrics::Metrics;
use crate::metrics::{Metrics, ServiceInfo};
use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo, WakeComputeRateLimiter};
use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::redis::kv_ops::RedisKVClient;
@@ -538,7 +538,7 @@ pub async fn run() -> anyhow::Result<()> {
maintenance_tasks.spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(600)).await;
db_schema_cache.flush();
db_schema_cache.0.run_pending_tasks();
}
});
}
@@ -590,6 +590,11 @@ pub async fn run() -> anyhow::Result<()> {
}
}
Metrics::get()
.service
.info
.set_label(ServiceInfo::running());
let maintenance = loop {
// get one complete task
match futures::future::select(
@@ -711,12 +716,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
info!("Using DbSchemaCache with options={db_schema_cache_config:?}");
let db_schema_cache = if args.is_rest_broker {
Some(DbSchemaCache::new(
"db_schema_cache",
db_schema_cache_config.size,
db_schema_cache_config.ttl,
true,
))
Some(DbSchemaCache::new(db_schema_cache_config))
} else {
None
};

View File

@@ -1,4 +1,12 @@
use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant};
use moka::Expiry;
use crate::control_plane::messages::ControlPlaneErrorMessage;
/// Default TTL used when caching errors from control plane.
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
/// A generic trait which exposes types of cache's key and value,
/// as well as the notion of cache entry invalidation.
@@ -10,20 +18,16 @@ pub(crate) trait Cache {
/// Entry's value.
type Value;
/// Used for entry invalidation.
type LookupInfo<Key>;
/// Invalidate an entry using a lookup info.
/// We don't have an empty default impl because it's error-prone.
fn invalidate(&self, _: &Self::LookupInfo<Self::Key>);
fn invalidate(&self, _: &Self::Key);
}
impl<C: Cache> Cache for &C {
type Key = C::Key;
type Value = C::Value;
type LookupInfo<Key> = C::LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<Self::Key>) {
fn invalidate(&self, info: &Self::Key) {
C::invalidate(self, info);
}
}
@@ -31,7 +35,7 @@ impl<C: Cache> Cache for &C {
/// Wrapper for convenient entry invalidation.
pub(crate) struct Cached<C: Cache, V = <C as Cache>::Value> {
/// Cache + lookup info.
pub(crate) token: Option<(C, C::LookupInfo<C::Key>)>,
pub(crate) token: Option<(C, C::Key)>,
/// The value itself.
pub(crate) value: V,
@@ -43,23 +47,6 @@ impl<C: Cache, V> Cached<C, V> {
Self { token: None, value }
}
pub(crate) fn take_value(self) -> (Cached<C, ()>, V) {
(
Cached {
token: self.token,
value: (),
},
self.value,
)
}
pub(crate) fn map<U>(self, f: impl FnOnce(V) -> U) -> Cached<C, U> {
Cached {
token: self.token,
value: f(self.value),
}
}
/// Drop this entry from a cache if it's still there.
pub(crate) fn invalidate(self) -> V {
if let Some((cache, info)) = &self.token {
@@ -87,3 +74,59 @@ impl<C: Cache, V> DerefMut for Cached<C, V> {
&mut self.value
}
}
pub type ControlPlaneResult<T> = Result<T, Box<ControlPlaneErrorMessage>>;
#[derive(Clone, Copy)]
pub struct CplaneExpiry {
pub error: Duration,
}
impl Default for CplaneExpiry {
fn default() -> Self {
Self {
error: DEFAULT_ERROR_TTL,
}
}
}
impl CplaneExpiry {
pub fn expire_early<V>(
&self,
value: &ControlPlaneResult<V>,
updated: Instant,
) -> Option<Duration> {
match value {
Ok(_) => None,
Err(err) => Some(self.expire_err_early(err, updated)),
}
}
pub fn expire_err_early(&self, err: &ControlPlaneErrorMessage, updated: Instant) -> Duration {
err.status
.as_ref()
.and_then(|s| s.details.retry_info.as_ref())
.map_or(self.error, |r| r.retry_at.into_std() - updated)
}
}
impl<K, V> Expiry<K, ControlPlaneResult<V>> for CplaneExpiry {
fn expire_after_create(
&self,
_key: &K,
value: &ControlPlaneResult<V>,
created_at: Instant,
) -> Option<Duration> {
self.expire_early(value, created_at)
}
fn expire_after_update(
&self,
_key: &K,
value: &ControlPlaneResult<V>,
updated_at: Instant,
_duration_until_expiry: Option<Duration>,
) -> Option<Duration> {
self.expire_early(value, updated_at)
}
}

View File

@@ -1,6 +1,5 @@
pub(crate) mod common;
pub(crate) mod node_info;
pub(crate) mod project_info;
mod timed_lru;
pub(crate) use common::{Cache, Cached};
pub(crate) use timed_lru::TimedLru;
pub(crate) use common::{Cached, ControlPlaneResult, CplaneExpiry};

47
proxy/src/cache/node_info.rs vendored Normal file
View File

@@ -0,0 +1,47 @@
use crate::cache::common::Cache;
use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry};
use crate::config::CacheOptions;
use crate::control_plane::NodeInfo;
use crate::types::EndpointCacheKey;
pub(crate) struct NodeInfoCache(moka::sync::Cache<EndpointCacheKey, ControlPlaneResult<NodeInfo>>);
pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
impl Cache for NodeInfoCache {
type Key = EndpointCacheKey;
type Value = ControlPlaneResult<NodeInfo>;
fn invalidate(&self, info: &EndpointCacheKey) {
self.0.invalidate(info);
}
}
impl NodeInfoCache {
pub fn new(config: CacheOptions) -> Self {
let builder = moka::sync::Cache::builder()
.name("node_info_cache")
.expire_after(CplaneExpiry::default());
let builder = config.moka(builder);
Self(builder.build())
}
pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult<NodeInfo>) {
self.0.insert(key, value);
}
pub fn get(&'static self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
self.0.get(key)
}
pub fn get_entry(
&'static self,
key: &EndpointCacheKey,
) -> Option<ControlPlaneResult<CachedNodeInfo>> {
self.get(key).map(|res| {
res.map(|value| Cached {
token: Some((self, key.clone())),
value,
})
})
}
}

View File

@@ -1,84 +1,17 @@
use std::collections::{HashMap, HashSet, hash_map};
use std::collections::HashSet;
use std::convert::Infallible;
use std::time::Duration;
use async_trait::async_trait;
use clashmap::ClashMap;
use clashmap::mapref::one::Ref;
use rand::Rng;
use tokio::time::Instant;
use moka::sync::Cache;
use tracing::{debug, info};
use crate::cache::common::{ControlPlaneResult, CplaneExpiry};
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::types::{EndpointId, RoleName};
#[async_trait]
pub(crate) trait ProjectInfoCache {
fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt);
fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt);
fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt);
fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
}
struct Entry<T> {
expires_at: Instant,
value: T,
}
impl<T> Entry<T> {
pub(crate) fn new(value: T, ttl: Duration) -> Self {
Self {
expires_at: Instant::now() + ttl,
value,
}
}
pub(crate) fn get(&self) -> Option<&T> {
(!self.is_expired()).then_some(&self.value)
}
fn is_expired(&self) -> bool {
self.expires_at <= Instant::now()
}
}
struct EndpointInfo {
role_controls: HashMap<RoleNameInt, Entry<ControlPlaneResult<RoleAccessControl>>>,
controls: Option<Entry<ControlPlaneResult<EndpointAccessControl>>>,
}
type ControlPlaneResult<T> = Result<T, Box<ControlPlaneErrorMessage>>;
impl EndpointInfo {
pub(crate) fn get_role_secret_with_ttl(
&self,
role_name: RoleNameInt,
) -> Option<(ControlPlaneResult<RoleAccessControl>, Duration)> {
let entry = self.role_controls.get(&role_name)?;
let ttl = entry.expires_at - Instant::now();
Some((entry.get()?.clone(), ttl))
}
pub(crate) fn get_controls_with_ttl(
&self,
) -> Option<(ControlPlaneResult<EndpointAccessControl>, Duration)> {
let entry = self.controls.as_ref()?;
let ttl = entry.expires_at - Instant::now();
Some((entry.get()?.clone(), ttl))
}
pub(crate) fn invalidate_endpoint(&mut self) {
self.controls = None;
}
pub(crate) fn invalidate_role_secret(&mut self, role_name: RoleNameInt) {
self.role_controls.remove(&role_name);
}
}
/// Cache for project info.
/// This is used to cache auth data for endpoints.
/// Invalidation is done by console notifications or by TTL (if console notifications are disabled).
@@ -86,8 +19,9 @@ impl EndpointInfo {
/// We also store endpoint-to-project mapping in the cache, to be able to access per-endpoint data.
/// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
/// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
pub struct ProjectInfoCacheImpl {
cache: ClashMap<EndpointIdInt, EndpointInfo>,
pub struct ProjectInfoCache {
role_controls: Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult<RoleAccessControl>>,
ep_controls: Cache<EndpointIdInt, ControlPlaneResult<EndpointAccessControl>>,
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
// FIXME(stefan): we need a way to GC the account2ep map.
@@ -96,16 +30,13 @@ pub struct ProjectInfoCacheImpl {
config: ProjectInfoCacheOptions,
}
#[async_trait]
impl ProjectInfoCache for ProjectInfoCacheImpl {
fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt) {
impl ProjectInfoCache {
pub fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt) {
info!("invalidating endpoint access for `{endpoint_id}`");
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_endpoint();
}
self.ep_controls.invalidate(&endpoint_id);
}
fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt) {
pub fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt) {
info!("invalidating endpoint access for project `{project_id}`");
let endpoints = self
.project2ep
@@ -113,13 +44,11 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_endpoint();
}
self.ep_controls.invalidate(&endpoint_id);
}
}
fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt) {
pub fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt) {
info!("invalidating endpoint access for org `{account_id}`");
let endpoints = self
.account2ep
@@ -127,13 +56,15 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_endpoint();
}
self.ep_controls.invalidate(&endpoint_id);
}
}
fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt) {
pub fn invalidate_role_secret_for_project(
&self,
project_id: ProjectIdInt,
role_name: RoleNameInt,
) {
info!(
"invalidating role secret for project_id `{}` and role_name `{}`",
project_id, role_name,
@@ -144,47 +75,52 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_role_secret(role_name);
}
self.role_controls.invalidate(&(endpoint_id, role_name));
}
}
}
impl ProjectInfoCacheImpl {
impl ProjectInfoCache {
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
// we cache errors for 30 seconds, unless retry_at is set.
let expiry = CplaneExpiry::default();
Self {
cache: ClashMap::new(),
role_controls: Cache::builder()
.name("role_access_controls")
.max_capacity(config.size * config.max_roles)
.time_to_live(config.ttl)
.expire_after(expiry)
.build(),
ep_controls: Cache::builder()
.name("endpoint_access_controls")
.max_capacity(config.size)
.time_to_live(config.ttl)
.expire_after(expiry)
.build(),
project2ep: ClashMap::new(),
account2ep: ClashMap::new(),
config,
}
}
fn get_endpoint_cache(
&self,
endpoint_id: &EndpointId,
) -> Option<Ref<'_, EndpointIdInt, EndpointInfo>> {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
self.cache.get(&endpoint_id)
}
pub(crate) fn get_role_secret_with_ttl(
pub(crate) fn get_role_secret(
&self,
endpoint_id: &EndpointId,
role_name: &RoleName,
) -> Option<(ControlPlaneResult<RoleAccessControl>, Duration)> {
) -> Option<ControlPlaneResult<RoleAccessControl>> {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
let role_name = RoleNameInt::get(role_name)?;
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
endpoint_info.get_role_secret_with_ttl(role_name)
self.role_controls.get(&(endpoint_id, role_name))
}
pub(crate) fn get_endpoint_access_with_ttl(
pub(crate) fn get_endpoint_access(
&self,
endpoint_id: &EndpointId,
) -> Option<(ControlPlaneResult<EndpointAccessControl>, Duration)> {
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
endpoint_info.get_controls_with_ttl()
) -> Option<ControlPlaneResult<EndpointAccessControl>> {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
self.ep_controls.get(&endpoint_id)
}
pub(crate) fn insert_endpoint_access(
@@ -203,34 +139,14 @@ impl ProjectInfoCacheImpl {
self.insert_project2endpoint(project_id, endpoint_id);
}
if self.cache.len() >= self.config.size {
// If there are too many entries, wait until the next gc cycle.
return;
}
debug!(
key = &*endpoint_id,
"created a cache entry for endpoint access"
);
let controls = Some(Entry::new(Ok(controls), self.config.ttl));
let role_controls = Entry::new(Ok(role_controls), self.config.ttl);
match self.cache.entry(endpoint_id) {
clashmap::Entry::Vacant(e) => {
e.insert(EndpointInfo {
role_controls: HashMap::from_iter([(role_name, role_controls)]),
controls,
});
}
clashmap::Entry::Occupied(mut e) => {
let ep = e.get_mut();
ep.controls = controls;
if ep.role_controls.len() < self.config.max_roles {
ep.role_controls.insert(role_name, role_controls);
}
}
}
self.ep_controls.insert(endpoint_id, Ok(controls));
self.role_controls
.insert((endpoint_id, role_name), Ok(role_controls));
}
pub(crate) fn insert_endpoint_access_err(
@@ -238,55 +154,30 @@ impl ProjectInfoCacheImpl {
endpoint_id: EndpointIdInt,
role_name: RoleNameInt,
msg: Box<ControlPlaneErrorMessage>,
ttl: Option<Duration>,
) {
if self.cache.len() >= self.config.size {
// If there are too many entries, wait until the next gc cycle.
return;
}
debug!(
key = &*endpoint_id,
"created a cache entry for an endpoint access error"
);
let ttl = ttl.unwrap_or(self.config.ttl);
let controls = if msg.get_reason() == Reason::RoleProtected {
// RoleProtected is the only role-specific error that control plane can give us.
// If a given role name does not exist, it still returns a successful response,
// just with an empty secret.
None
} else {
// We can cache all the other errors in EndpointInfo.controls,
// because they don't depend on what role name we pass to control plane.
Some(Entry::new(Err(msg.clone()), ttl))
};
let role_controls = Entry::new(Err(msg), ttl);
match self.cache.entry(endpoint_id) {
clashmap::Entry::Vacant(e) => {
e.insert(EndpointInfo {
role_controls: HashMap::from_iter([(role_name, role_controls)]),
controls,
// RoleProtected is the only role-specific error that control plane can give us.
// If a given role name does not exist, it still returns a successful response,
// just with an empty secret.
if msg.get_reason() != Reason::RoleProtected {
// We can cache all the other errors in ep_controls because they don't
// depend on what role name we pass to control plane.
self.ep_controls
.entry(endpoint_id)
.and_compute_with(|entry| match entry {
// leave the entry alone if it's already Ok
Some(entry) if entry.value().is_ok() => moka::ops::compute::Op::Nop,
// replace the entry
_ => moka::ops::compute::Op::Put(Err(msg.clone())),
});
}
clashmap::Entry::Occupied(mut e) => {
let ep = e.get_mut();
if let Some(entry) = &ep.controls
&& !entry.is_expired()
&& entry.value.is_ok()
{
// If we have cached non-expired, non-error controls, keep them.
} else {
ep.controls = controls;
}
if ep.role_controls.len() < self.config.max_roles {
ep.role_controls.insert(role_name, role_controls);
}
}
}
self.role_controls
.insert((endpoint_id, role_name), Err(msg));
}
fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
@@ -307,73 +198,35 @@ impl ProjectInfoCacheImpl {
}
}
pub fn maybe_invalidate_role_secret(&self, endpoint_id: &EndpointId, role_name: &RoleName) {
let Some(endpoint_id) = EndpointIdInt::get(endpoint_id) else {
return;
};
let Some(role_name) = RoleNameInt::get(role_name) else {
return;
};
let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) else {
return;
};
let entry = endpoint_info.role_controls.entry(role_name);
let hash_map::Entry::Occupied(role_controls) = entry else {
return;
};
if role_controls.get().is_expired() {
role_controls.remove();
}
pub fn maybe_invalidate_role_secret(&self, _endpoint_id: &EndpointId, _role_name: &RoleName) {
// TODO: Expire the value early if the key is idle.
// Currently not an issue as we would just use the TTL to decide, which is what already happens.
}
pub async fn gc_worker(&self) -> anyhow::Result<Infallible> {
let mut interval =
tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32);
let mut interval = tokio::time::interval(self.config.gc_interval);
loop {
interval.tick().await;
if self.cache.len() < self.config.size {
// If there are not too many entries, wait until the next gc cycle.
continue;
}
self.gc();
self.ep_controls.run_pending_tasks();
self.role_controls.run_pending_tasks();
}
}
fn gc(&self) {
let shard = rand::rng().random_range(0..self.project2ep.shards().len());
debug!(shard, "project_info_cache: performing epoch reclamation");
// acquire a random shard lock
let mut removed = 0;
let shard = self.project2ep.shards()[shard].write();
for (_, endpoints) in shard.iter() {
for endpoint in endpoints {
self.cache.remove(endpoint);
removed += 1;
}
}
// We can drop this shard only after making sure that all endpoints are removed.
drop(shard);
info!("project_info_cache: removed {removed} endpoints");
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use super::*;
use crate::control_plane::messages::{Details, EndpointRateLimitConfig, ErrorInfo, Status};
use crate::control_plane::{AccessBlockerFlags, AuthSecret};
use crate::scram::ServerSecret;
use std::sync::Arc;
#[tokio::test]
async fn test_project_info_cache_settings() {
tokio::time::pause();
let cache = ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
size: 2,
let cache = ProjectInfoCache::new(ProjectInfoCacheOptions {
size: 1,
max_roles: 2,
ttl: Duration::from_secs(1),
gc_interval: Duration::from_secs(600),
@@ -423,22 +276,17 @@ mod tests {
},
);
let (cached, ttl) = cache
.get_role_secret_with_ttl(&endpoint_id, &user1)
.unwrap();
let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
assert_eq!(cached.unwrap().secret, secret1);
assert_eq!(ttl, cache.config.ttl);
let (cached, ttl) = cache
.get_role_secret_with_ttl(&endpoint_id, &user2)
.unwrap();
let cached = cache.get_role_secret(&endpoint_id, &user2).unwrap();
assert_eq!(cached.unwrap().secret, secret2);
assert_eq!(ttl, cache.config.ttl);
// Shouldn't add more than 2 roles.
let user3: RoleName = "user3".into();
let secret3 = Some(AuthSecret::Scram(ServerSecret::mock([3; 32])));
cache.role_controls.run_pending_tasks();
cache.insert_endpoint_access(
account_id,
project_id,
@@ -455,31 +303,18 @@ mod tests {
},
);
assert!(
cache
.get_role_secret_with_ttl(&endpoint_id, &user3)
.is_none()
);
cache.role_controls.run_pending_tasks();
assert_eq!(cache.role_controls.entry_count(), 2);
let cached = cache
.get_endpoint_access_with_ttl(&endpoint_id)
.unwrap()
.0
.unwrap();
assert_eq!(cached.allowed_ips, allowed_ips);
tokio::time::sleep(Duration::from_secs(2)).await;
tokio::time::advance(Duration::from_secs(2)).await;
let cached = cache.get_role_secret_with_ttl(&endpoint_id, &user1);
assert!(cached.is_none());
let cached = cache.get_role_secret_with_ttl(&endpoint_id, &user2);
assert!(cached.is_none());
let cached = cache.get_endpoint_access_with_ttl(&endpoint_id);
assert!(cached.is_none());
cache.role_controls.run_pending_tasks();
assert_eq!(cache.role_controls.entry_count(), 0);
}
#[tokio::test]
async fn test_caching_project_info_errors() {
let cache = ProjectInfoCacheImpl::new(ProjectInfoCacheOptions {
let cache = ProjectInfoCache::new(ProjectInfoCacheOptions {
size: 10,
max_roles: 10,
ttl: Duration::from_secs(1),
@@ -519,34 +354,23 @@ mod tests {
status: None,
});
let get_role_secret = |endpoint_id, role_name| {
cache
.get_role_secret_with_ttl(endpoint_id, role_name)
.unwrap()
.0
};
let get_endpoint_access =
|endpoint_id| cache.get_endpoint_access_with_ttl(endpoint_id).unwrap().0;
let get_role_secret =
|endpoint_id, role_name| cache.get_role_secret(endpoint_id, role_name).unwrap();
let get_endpoint_access = |endpoint_id| cache.get_endpoint_access(endpoint_id).unwrap();
// stores role-specific errors only for get_role_secret
cache.insert_endpoint_access_err(
(&endpoint_id).into(),
(&user1).into(),
role_msg.clone(),
None,
);
cache.insert_endpoint_access_err((&endpoint_id).into(), (&user1).into(), role_msg.clone());
assert_eq!(
get_role_secret(&endpoint_id, &user1).unwrap_err().error,
role_msg.error
);
assert!(cache.get_endpoint_access_with_ttl(&endpoint_id).is_none());
assert!(cache.get_endpoint_access(&endpoint_id).is_none());
// stores non-role specific errors for both get_role_secret and get_endpoint_access
cache.insert_endpoint_access_err(
(&endpoint_id).into(),
(&user1).into(),
generic_msg.clone(),
None,
);
assert_eq!(
get_role_secret(&endpoint_id, &user1).unwrap_err().error,
@@ -558,11 +382,7 @@ mod tests {
);
// error isn't returned for other roles in the same endpoint
assert!(
cache
.get_role_secret_with_ttl(&endpoint_id, &user2)
.is_none()
);
assert!(cache.get_role_secret(&endpoint_id, &user2).is_none());
// success for a role does not overwrite errors for other roles
cache.insert_endpoint_access(
@@ -590,7 +410,6 @@ mod tests {
(&endpoint_id).into(),
(&user2).into(),
generic_msg.clone(),
None,
);
assert!(get_role_secret(&endpoint_id, &user2).is_err());
assert!(get_endpoint_access(&endpoint_id).is_ok());

View File

@@ -1,262 +0,0 @@
use std::borrow::Borrow;
use std::hash::Hash;
use std::time::{Duration, Instant};
// This seems to make more sense than `lru` or `cached`:
//
// * `near/nearcore` ditched `cached` in favor of `lru`
// (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed).
//
// * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs).
// This severely hinders its usage both in terms of creating wrappers and supported key types.
//
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{LruCache, linked_hash_map::RawEntryMut};
use tracing::debug;
use super::Cache;
use super::common::Cached;
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:
///
/// * Whenever a new entry is inserted, the least recently accessed one is evicted.
/// The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`).
///
/// * If `update_ttl_on_retrieval` is `true`. When the entry is about to be retrieved, we check its expiration timestamp.
/// If the entry has expired, we remove it from the cache; Otherwise we bump the
/// expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong
/// its existence.
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
/// There is no background job to reap the expired records.
///
/// * It's possible for an entry that has not yet expired entry to be evicted
/// before expired items. That's a bit wasteful, but probably fine in practice.
pub(crate) struct TimedLru<K, V> {
/// Cache's name for tracing.
name: &'static str,
/// The underlying cache implementation.
cache: parking_lot::Mutex<LruCache<K, Entry<V>>>,
/// Default time-to-live of a single entry.
ttl: Duration,
update_ttl_on_retrieval: bool,
}
impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
type Key = K;
type Value = V;
type LookupInfo<Key> = Key;
fn invalidate(&self, info: &Self::LookupInfo<K>) {
self.invalidate_raw(info);
}
}
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
ttl: Duration,
update_ttl_on_retrieval: bool,
value: T,
}
impl<K: Hash + Eq, V> TimedLru<K, V> {
/// Construct a new LRU cache with timed entries.
pub(crate) fn new(
name: &'static str,
capacity: usize,
ttl: Duration,
update_ttl_on_retrieval: bool,
) -> Self {
Self {
name,
cache: LruCache::new(capacity).into(),
ttl,
update_ttl_on_retrieval,
}
}
/// Drop an entry from the cache if it's outdated.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn invalidate_raw(&self, key: &K) {
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return,
RawEntryMut::Occupied(x) => x.remove(),
};
drop(cache); // drop lock before logging
let Entry {
created_at,
expires_at,
..
} = entry;
debug!(
?created_at,
?expires_at,
"processed a cache entry invalidation event"
);
}
/// Try retrieving an entry by its key, then execute `extract` if it exists.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn get_raw<Q, R>(&self, key: &Q, extract: impl FnOnce(&K, &Entry<V>) -> R) -> Option<R>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let mut raw_entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return None,
RawEntryMut::Occupied(x) => x,
};
// Immeditely drop the entry if it has expired.
let entry = raw_entry.get();
if entry.expires_at <= now {
raw_entry.remove();
return None;
}
let value = extract(raw_entry.key(), entry);
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
// Update the deadline and the entry's position in the LRU list.
let deadline = now.checked_add(raw_entry.get().ttl).expect("time overflow");
if raw_entry.get().update_ttl_on_retrieval {
raw_entry.get_mut().expires_at = deadline;
}
raw_entry.to_back();
drop(cache); // drop lock before logging
debug!(
created_at = format_args!("{created_at:?}"),
old_expires_at = format_args!("{expires_at:?}"),
new_expires_at = format_args!("{deadline:?}"),
"accessed a cache entry"
);
Some(value)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw_ttl(
&self,
key: K,
value: V,
ttl: Duration,
update: bool,
) -> (Instant, Option<V>) {
let created_at = Instant::now();
let expires_at = created_at.checked_add(ttl).expect("time overflow");
let entry = Entry {
created_at,
expires_at,
ttl,
update_ttl_on_retrieval: update,
value,
};
// Do costly things before taking the lock.
let old = self
.cache
.lock()
.insert(key, entry)
.map(|entry| entry.value);
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
replaced = old.is_some(),
"created a cache entry"
);
(created_at, old)
}
}
impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
pub(crate) fn insert_ttl(&self, key: K, value: V, ttl: Duration) {
self.insert_raw_ttl(key, value, ttl, false);
}
#[cfg(feature = "rest_broker")]
pub(crate) fn insert(&self, key: K, value: V) {
self.insert_raw_ttl(key, value, self.ttl, self.update_ttl_on_retrieval);
}
pub(crate) fn insert_unit(&self, key: K, value: V) -> (Option<V>, Cached<&Self, ()>) {
let (_, old) = self.insert_raw(key.clone(), value);
let cached = Cached {
token: Some((self, key)),
value: (),
};
(old, cached)
}
#[cfg(feature = "rest_broker")]
pub(crate) fn flush(&self) {
let now = Instant::now();
let mut cache = self.cache.lock();
// Collect keys of expired entries first
let expired_keys: Vec<_> = cache
.iter()
.filter_map(|(key, entry)| {
if entry.expires_at <= now {
Some(key.clone())
} else {
None
}
})
.collect();
// Remove expired entries
for key in expired_keys {
cache.remove(&key);
}
}
}
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
/// Retrieve a cached entry in convenient wrapper, alongside timing information.
pub(crate) fn get_with_created_at<Q>(
&self,
key: &Q,
) -> Option<Cached<&Self, (<Self as Cache>::Value, Instant)>>
where
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| Cached {
token: Some((self, key.clone())),
value: (entry.value.clone(), entry.created_at),
})
}
}

View File

@@ -107,20 +107,23 @@ pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig>
#[derive(Debug)]
pub struct CacheOptions {
/// Max number of entries.
pub size: usize,
pub size: Option<u64>,
/// Entry's time-to-live.
pub ttl: Duration,
pub absolute_ttl: Option<Duration>,
/// Entry's time-to-idle.
pub idle_ttl: Option<Duration>,
}
impl CacheOptions {
/// Default options for [`crate::control_plane::NodeInfoCache`].
pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,ttl=4m";
/// Default options for [`crate::cache::node_info::NodeInfoCache`].
pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,idle_ttl=4m";
/// Parse cache options passed via cmdline.
/// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
fn parse(options: &str) -> anyhow::Result<Self> {
let mut size = None;
let mut ttl = None;
let mut absolute_ttl = None;
let mut idle_ttl = None;
for option in options.split(',') {
let (key, value) = option
@@ -129,21 +132,34 @@ impl CacheOptions {
match key {
"size" => size = Some(value.parse()?),
"ttl" => ttl = Some(humantime::parse_duration(value)?),
"absolute_ttl" | "ttl" => absolute_ttl = Some(humantime::parse_duration(value)?),
"idle_ttl" | "tti" => idle_ttl = Some(humantime::parse_duration(value)?),
unknown => bail!("unknown key: {unknown}"),
}
}
// TTL doesn't matter if cache is always empty.
if let Some(0) = size {
ttl.get_or_insert(Duration::default());
}
Ok(Self {
size: size.context("missing `size`")?,
ttl: ttl.context("missing `ttl`")?,
size,
absolute_ttl,
idle_ttl,
})
}
pub fn moka<K, V, C>(
&self,
mut builder: moka::sync::CacheBuilder<K, V, C>,
) -> moka::sync::CacheBuilder<K, V, C> {
if let Some(size) = self.size {
builder = builder.max_capacity(size);
}
if let Some(ttl) = self.absolute_ttl {
builder = builder.time_to_live(ttl);
}
if let Some(tti) = self.idle_ttl {
builder = builder.time_to_idle(tti);
}
builder
}
}
impl FromStr for CacheOptions {
@@ -159,17 +175,17 @@ impl FromStr for CacheOptions {
#[derive(Debug)]
pub struct ProjectInfoCacheOptions {
/// Max number of entries.
pub size: usize,
pub size: u64,
/// Entry's time-to-live.
pub ttl: Duration,
/// Max number of roles per endpoint.
pub max_roles: usize,
pub max_roles: u64,
/// Gc interval.
pub gc_interval: Duration,
}
impl ProjectInfoCacheOptions {
/// Default options for [`crate::control_plane::NodeInfoCache`].
/// Default options for [`crate::cache::project_info::ProjectInfoCache`].
pub const CACHE_DEFAULT_OPTIONS: &'static str =
"size=10000,ttl=4m,max_roles=10,gc_interval=60m";
@@ -496,21 +512,37 @@ mod tests {
#[test]
fn test_parse_cache_options() -> anyhow::Result<()> {
let CacheOptions { size, ttl } = "size=4096,ttl=5min".parse()?;
assert_eq!(size, 4096);
assert_eq!(ttl, Duration::from_secs(5 * 60));
let CacheOptions {
size,
absolute_ttl,
idle_ttl: _,
} = "size=4096,ttl=5min".parse()?;
assert_eq!(size, Some(4096));
assert_eq!(absolute_ttl, Some(Duration::from_secs(5 * 60)));
let CacheOptions { size, ttl } = "ttl=4m,size=2".parse()?;
assert_eq!(size, 2);
assert_eq!(ttl, Duration::from_secs(4 * 60));
let CacheOptions {
size,
absolute_ttl,
idle_ttl: _,
} = "ttl=4m,size=2".parse()?;
assert_eq!(size, Some(2));
assert_eq!(absolute_ttl, Some(Duration::from_secs(4 * 60)));
let CacheOptions { size, ttl } = "size=0,ttl=1s".parse()?;
assert_eq!(size, 0);
assert_eq!(ttl, Duration::from_secs(1));
let CacheOptions {
size,
absolute_ttl,
idle_ttl: _,
} = "size=0,ttl=1s".parse()?;
assert_eq!(size, Some(0));
assert_eq!(absolute_ttl, Some(Duration::from_secs(1)));
let CacheOptions { size, ttl } = "size=0".parse()?;
assert_eq!(size, 0);
assert_eq!(ttl, Duration::default());
let CacheOptions {
size,
absolute_ttl,
idle_ttl: _,
} = "size=0".parse()?;
assert_eq!(size, Some(0));
assert_eq!(absolute_ttl, None);
Ok(())
}

View File

@@ -3,7 +3,6 @@
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use ::http::HeaderName;
use ::http::header::AUTHORIZATION;
@@ -17,6 +16,8 @@ use tracing::{Instrument, debug, info, info_span, warn};
use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::cache::Cached;
use crate::cache::node_info::CachedNodeInfo;
use crate::context::RequestContext;
use crate::control_plane::caches::ApiCaches;
use crate::control_plane::errors::{
@@ -25,8 +26,7 @@ use crate::control_plane::errors::{
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse};
use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl,
AccessBlockerFlags, AuthInfo, AuthSecret, EndpointAccessControl, NodeInfo, RoleAccessControl,
};
use crate::metrics::Metrics;
use crate::proxy::retry::CouldRetry;
@@ -118,7 +118,6 @@ impl NeonControlPlaneClient {
cache_key.into(),
role.into(),
msg.clone(),
retry_info.map(|r| Duration::from_millis(r.retry_delay_ms)),
);
Err(err)
@@ -347,18 +346,11 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
) -> Result<RoleAccessControl, GetAuthInfoError> {
let key = endpoint.normalize();
if let Some((role_control, ttl)) = self
.caches
.project_info
.get_role_secret_with_ttl(&key, role)
{
if let Some(role_control) = self.caches.project_info.get_role_secret(&key, role) {
return match role_control {
Err(mut msg) => {
Err(msg) => {
info!(key = &*key, "found cached get_role_access_control error");
// if retry_delay_ms is set change it to the remaining TTL
replace_retry_delay_ms(&mut msg, |_| ttl.as_millis() as u64);
Err(GetAuthInfoError::ApiError(ControlPlaneError::Message(msg)))
}
Ok(role_control) => {
@@ -383,17 +375,14 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
) -> Result<EndpointAccessControl, GetAuthInfoError> {
let key = endpoint.normalize();
if let Some((control, ttl)) = self.caches.project_info.get_endpoint_access_with_ttl(&key) {
if let Some(control) = self.caches.project_info.get_endpoint_access(&key) {
return match control {
Err(mut msg) => {
Err(msg) => {
info!(
key = &*key,
"found cached get_endpoint_access_control error"
);
// if retry_delay_ms is set change it to the remaining TTL
replace_retry_delay_ms(&mut msg, |_| ttl.as_millis() as u64);
Err(GetAuthInfoError::ApiError(ControlPlaneError::Message(msg)))
}
Ok(control) => {
@@ -426,17 +415,11 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
macro_rules! check_cache {
() => {
if let Some(cached) = self.caches.node_info.get_with_created_at(&key) {
let (cached, (info, created_at)) = cached.take_value();
if let Some(info) = self.caches.node_info.get_entry(&key) {
return match info {
Err(mut msg) => {
Err(msg) => {
info!(key = &*key, "found cached wake_compute error");
// if retry_delay_ms is set, reduce it by the amount of time it spent in cache
replace_retry_delay_ms(&mut msg, |delay| {
delay.saturating_sub(created_at.elapsed().as_millis() as u64)
});
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
msg,
)))
@@ -444,7 +427,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
Ok(info) => {
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
Ok(cached.map(|()| info))
Ok(info)
}
};
}
@@ -483,10 +466,12 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
let mut stored_node = node.clone();
// store the cached node as 'warm_cached'
stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
self.caches.node_info.insert(key.clone(), Ok(stored_node));
let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
Ok(cached.map(|()| node))
Ok(Cached {
token: Some((&self.caches.node_info, key)),
value: node,
})
}
Err(err) => match err {
WakeComputeError::ControlPlane(ControlPlaneError::Message(ref msg)) => {
@@ -503,11 +488,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
"created a cache entry for the wake compute error"
);
let ttl = retry_info.map_or(Duration::from_secs(30), |r| {
Duration::from_millis(r.retry_delay_ms)
});
self.caches.node_info.insert_ttl(key, Err(msg.clone()), ttl);
self.caches.node_info.insert(key, Err(msg.clone()));
Err(err)
}
@@ -517,14 +498,6 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
}
}
fn replace_retry_delay_ms(msg: &mut ControlPlaneErrorMessage, f: impl FnOnce(u64) -> u64) {
if let Some(status) = &mut msg.status
&& let Some(retry_info) = &mut status.details.retry_info
{
retry_info.retry_delay_ms = f(retry_info.retry_delay_ms);
}
}
/// Parse http response body, taking status code into account.
fn parse_body<T: for<'a> serde::Deserialize<'a>>(
status: StatusCode,

View File

@@ -15,6 +15,7 @@ use crate::auth::IpPattern;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::cache::Cached;
use crate::cache::node_info::CachedNodeInfo;
use crate::compute::ConnectInfo;
use crate::context::RequestContext;
use crate::control_plane::errors::{
@@ -22,8 +23,7 @@ use crate::control_plane::errors::{
};
use crate::control_plane::messages::{EndpointRateLimitConfig, MetricsAuxInfo};
use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl,
AccessBlockerFlags, AuthInfo, AuthSecret, EndpointAccessControl, NodeInfo, RoleAccessControl,
};
use crate::intern::RoleNameInt;
use crate::scram;

View File

@@ -13,10 +13,11 @@ use tracing::{debug, info};
use super::{EndpointAccessControl, RoleAccessControl};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
use crate::cache::project_info::ProjectInfoCacheImpl;
use crate::cache::node_info::{CachedNodeInfo, NodeInfoCache};
use crate::cache::project_info::ProjectInfoCache;
use crate::config::{CacheOptions, ProjectInfoCacheOptions};
use crate::context::RequestContext;
use crate::control_plane::{CachedNodeInfo, ControlPlaneApi, NodeInfoCache, errors};
use crate::control_plane::{ControlPlaneApi, errors};
use crate::error::ReportableError;
use crate::metrics::ApiLockMetrics;
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
@@ -119,7 +120,7 @@ pub struct ApiCaches {
/// Cache for the `wake_compute` API method.
pub(crate) node_info: NodeInfoCache,
/// Cache which stores project_id -> endpoint_ids mapping.
pub project_info: Arc<ProjectInfoCacheImpl>,
pub project_info: Arc<ProjectInfoCache>,
}
impl ApiCaches {
@@ -128,13 +129,8 @@ impl ApiCaches {
project_info_cache_config: ProjectInfoCacheOptions,
) -> Self {
Self {
node_info: NodeInfoCache::new(
"node_info_cache",
wake_compute_cache_config.size,
wake_compute_cache_config.ttl,
true,
),
project_info: Arc::new(ProjectInfoCacheImpl::new(project_info_cache_config)),
node_info: NodeInfoCache::new(wake_compute_cache_config),
project_info: Arc::new(ProjectInfoCache::new(project_info_cache_config)),
}
}
}

View File

@@ -1,8 +1,10 @@
use std::fmt::{self, Display};
use std::time::Duration;
use measured::FixedCardinalityLabel;
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use tokio::time::Instant;
use crate::auth::IpPattern;
use crate::intern::{AccountIdInt, BranchIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
@@ -231,7 +233,13 @@ impl Reason {
#[derive(Copy, Clone, Debug, Deserialize)]
#[allow(dead_code)]
pub(crate) struct RetryInfo {
pub(crate) retry_delay_ms: u64,
#[serde(rename = "retry_delay_ms", deserialize_with = "milliseconds_from_now")]
pub(crate) retry_at: Instant,
}
fn milliseconds_from_now<'de, D: serde::Deserializer<'de>>(d: D) -> Result<Instant, D::Error> {
let millis = u64::deserialize(d)?;
Ok(Instant::now() + Duration::from_millis(millis))
}
#[derive(Debug, Deserialize, Clone)]

View File

@@ -16,13 +16,13 @@ use messages::EndpointRateLimitConfig;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list};
use crate::cache::{Cached, TimedLru};
use crate::cache::node_info::CachedNodeInfo;
use crate::context::RequestContext;
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
use crate::control_plane::messages::MetricsAuxInfo;
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
use crate::protocol2::ConnectionInfoExtra;
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::types::{EndpointId, RoleName};
use crate::{compute, scram};
/// Various cache-related types.
@@ -77,10 +77,6 @@ pub(crate) struct AccessBlockerFlags {
pub vpc_access_blocked: bool,
}
pub(crate) type NodeInfoCache =
TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ControlPlaneErrorMessage>>>;
pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
#[derive(Clone, Debug)]
pub struct RoleAccessControl {
pub secret: Option<AuthSecret>,

View File

@@ -2,7 +2,8 @@ use std::sync::{Arc, OnceLock};
use lasso::ThreadedRodeo;
use measured::label::{
FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue,
StaticLabelSet,
};
use measured::metric::histogram::Thresholds;
use measured::metric::name::MetricName;
@@ -10,7 +11,7 @@ use measured::{
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
MetricGroup,
};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
use tokio::time::{self, Instant};
use crate::control_plane::messages::ColdStartInfo;
@@ -25,6 +26,9 @@ pub struct Metrics {
#[metric(namespace = "wake_compute_lock")]
pub wake_compute_lock: ApiLockMetrics,
#[metric(namespace = "service")]
pub service: ServiceMetrics,
}
static SELF: OnceLock<Metrics> = OnceLock::new();
@@ -660,3 +664,43 @@ pub struct ThreadPoolMetrics {
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
}
#[derive(MetricGroup, Default)]
pub struct ServiceMetrics {
pub info: InfoMetric<ServiceInfo>,
}
#[derive(Default)]
pub struct ServiceInfo {
pub state: ServiceState,
}
impl ServiceInfo {
pub const fn running() -> Self {
ServiceInfo {
state: ServiceState::Running,
}
}
pub const fn terminating() -> Self {
ServiceInfo {
state: ServiceState::Terminating,
}
}
}
impl LabelGroup for ServiceInfo {
fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
const STATE: &LabelName = LabelName::from_str("state");
v.write_value(STATE, &self.state);
}
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)]
#[label(singleton = "state")]
pub enum ServiceState {
#[default]
Init,
Running,
Terminating,
}

View File

@@ -2,7 +2,7 @@ use thiserror::Error;
use crate::auth::Backend;
use crate::auth::backend::ComputeUserInfo;
use crate::cache::Cache;
use crate::cache::common::Cache;
use crate::compute::{AuthInfo, ComputeConnection, ConnectionError, PostgresError};
use crate::config::ProxyConfig;
use crate::context::RequestContext;

View File

@@ -1,11 +1,12 @@
use tokio::time;
use tracing::{debug, info, warn};
use crate::cache::node_info::CachedNodeInfo;
use crate::compute::{self, COULD_NOT_CONNECT, ComputeConnection};
use crate::config::{ComputeConfig, ProxyConfig, RetryConfig};
use crate::context::RequestContext;
use crate::control_plane::NodeInfo;
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::{self, NodeInfo};
use crate::metrics::{
ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
};
@@ -17,7 +18,7 @@ use crate::types::Host;
/// (e.g. the compute node's address might've changed at the wrong time).
/// Invalidate the cache entry (if any) to prevent subsequent errors.
#[tracing::instrument(skip_all)]
pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
pub(crate) fn invalidate_cache(node_info: CachedNodeInfo) -> NodeInfo {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
@@ -37,7 +38,7 @@ pub(crate) trait ConnectMechanism {
async fn connect_once(
&self,
ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo,
node_info: &CachedNodeInfo,
config: &ComputeConfig,
) -> Result<Self::Connection, compute::ConnectionError>;
}
@@ -66,7 +67,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
async fn connect_once(
&self,
ctx: &RequestContext,
node_info: &control_plane::CachedNodeInfo,
node_info: &CachedNodeInfo,
config: &ComputeConfig,
) -> Result<ComputeConnection, compute::ConnectionError> {
let permit = self.locks.get_permit(&node_info.conn_info.host).await?;

View File

@@ -15,15 +15,17 @@ use rstest::rstest;
use rustls::crypto::ring;
use rustls::pki_types;
use tokio::io::{AsyncRead, AsyncWrite, DuplexStream};
use tokio::time::Instant;
use tracing_test::traced_test;
use super::retry::CouldRetry;
use crate::auth::backend::{ComputeUserInfo, MaybeOwned};
use crate::config::{ComputeConfig, RetryConfig, TlsConfig};
use crate::cache::node_info::{CachedNodeInfo, NodeInfoCache};
use crate::config::{CacheOptions, ComputeConfig, RetryConfig, TlsConfig};
use crate::context::RequestContext;
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
use crate::control_plane::{self, CachedNodeInfo, NodeInfo, NodeInfoCache};
use crate::control_plane::{self, NodeInfo};
use crate::error::ErrorKind;
use crate::pglb::ERR_INSECURE_CONNECTION;
use crate::pglb::handshake::{HandshakeData, handshake};
@@ -417,12 +419,11 @@ impl TestConnectMechanism {
Self {
counter: Arc::new(std::sync::Mutex::new(0)),
sequence,
cache: Box::leak(Box::new(NodeInfoCache::new(
"test",
1,
Duration::from_secs(100),
false,
))),
cache: Box::leak(Box::new(NodeInfoCache::new(CacheOptions {
size: Some(1),
absolute_ttl: Some(Duration::from_secs(100)),
idle_ttl: None,
}))),
}
}
}
@@ -436,7 +437,7 @@ impl ConnectMechanism for TestConnectMechanism {
async fn connect_once(
&self,
_ctx: &RequestContext,
_node_info: &control_plane::CachedNodeInfo,
_node_info: &CachedNodeInfo,
_config: &ComputeConfig,
) -> Result<Self::Connection, compute::ConnectionError> {
let mut counter = self.counter.lock().unwrap();
@@ -501,7 +502,7 @@ impl TestControlPlaneClient for TestConnectMechanism {
details: Details {
error_info: None,
retry_info: Some(control_plane::messages::RetryInfo {
retry_delay_ms: 1,
retry_at: Instant::now() + Duration::from_millis(1),
}),
user_facing_message: None,
},
@@ -546,8 +547,11 @@ fn helper_create_uncached_node_info() -> NodeInfo {
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = helper_create_uncached_node_info();
let (_, node2) = cache.insert_unit("key".into(), Ok(node.clone()));
node2.map(|()| node)
cache.insert("key".into(), Ok(node.clone()));
CachedNodeInfo {
token: Some((cache, "key".into())),
value: node,
}
}
fn helper_create_connect_info(

View File

@@ -1,9 +1,9 @@
use async_trait::async_trait;
use tracing::{error, info};
use crate::cache::node_info::CachedNodeInfo;
use crate::config::RetryConfig;
use crate::context::RequestContext;
use crate::control_plane::CachedNodeInfo;
use crate::control_plane::errors::{ControlPlaneError, WakeComputeError};
use crate::error::ReportableError;
use crate::metrics::{

View File

@@ -131,11 +131,11 @@ where
Ok(())
}
struct MessageHandler<C: ProjectInfoCache + Send + Sync + 'static> {
struct MessageHandler<C: Send + Sync + 'static> {
cache: Arc<C>,
}
impl<C: ProjectInfoCache + Send + Sync + 'static> Clone for MessageHandler<C> {
impl<C: Send + Sync + 'static> Clone for MessageHandler<C> {
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
@@ -143,8 +143,8 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> Clone for MessageHandler<C> {
}
}
impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
pub(crate) fn new(cache: Arc<C>) -> Self {
impl MessageHandler<ProjectInfoCache> {
pub(crate) fn new(cache: Arc<ProjectInfoCache>) -> Self {
Self { cache }
}
@@ -224,7 +224,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
}
}
fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
fn invalidate_cache(cache: Arc<ProjectInfoCache>, msg: Notification) {
match msg {
Notification::EndpointSettingsUpdate(ids) => ids
.iter()
@@ -247,8 +247,8 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
}
}
async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
handler: MessageHandler<C>,
async fn handle_messages(
handler: MessageHandler<ProjectInfoCache>,
redis: ConnectionWithCredentialsProvider,
cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
@@ -284,13 +284,10 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
/// Handle console's invalidation messages.
#[tracing::instrument(name = "redis_notifications", skip_all)]
pub async fn task_main<C>(
pub async fn task_main(
redis: ConnectionWithCredentialsProvider,
cache: Arc<C>,
) -> anyhow::Result<Infallible>
where
C: ProjectInfoCache + Send + Sync + 'static,
{
cache: Arc<ProjectInfoCache>,
) -> anyhow::Result<Infallible> {
let handler = MessageHandler::new(cache);
// 6h - 1m.
// There will be 1 minute overlap between two tasks. But at least we can be sure that no message is lost.

View File

@@ -12,6 +12,7 @@ use hyper::body::Incoming;
use hyper::http::{HeaderName, HeaderValue};
use hyper::{Request, Response, StatusCode};
use indexmap::IndexMap;
use moka::sync::Cache;
use ouroboros::self_referencing;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Deserializer};
@@ -53,7 +54,6 @@ use super::http_util::{
};
use super::json::JsonConversionError;
use crate::auth::backend::ComputeCredentialKeys;
use crate::cache::{Cached, TimedLru};
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
@@ -138,8 +138,15 @@ pub struct ApiConfig {
}
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
pub(crate) type DbSchemaCache = TimedLru<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>;
pub(crate) struct DbSchemaCache(pub Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
impl DbSchemaCache {
pub fn new(config: crate::config::CacheOptions) -> Self {
let builder = Cache::builder().name("db_schema_cache");
let builder = config.moka(builder);
Self(builder.build())
}
pub async fn get_cached_or_remote(
&self,
endpoint_id: &EndpointCacheKey,
@@ -149,8 +156,8 @@ impl DbSchemaCache {
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
match self.get_with_created_at(endpoint_id) {
Some(Cached { value: (v, _), .. }) => Ok(v),
match self.0.get(endpoint_id) {
Some(v) => Ok(v),
None => {
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
let remote_value = self
@@ -173,7 +180,7 @@ impl DbSchemaCache {
db_extra_search_path: None,
};
let value = Arc::new((api_config, schema_owned));
self.insert(endpoint_id.clone(), value);
self.0.insert(endpoint_id.clone(), value);
return Err(e);
}
Err(e) => {
@@ -181,7 +188,7 @@ impl DbSchemaCache {
}
};
let value = Arc::new((api_config, schema_owned));
self.insert(endpoint_id.clone(), value.clone());
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
}
}

View File

@@ -4,6 +4,8 @@ use anyhow::bail;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use crate::metrics::{Metrics, ServiceInfo};
/// Handle unix signals appropriately.
pub async fn handle<F>(
token: CancellationToken,
@@ -28,10 +30,12 @@ where
// Shut down the whole application.
_ = interrupt.recv() => {
warn!("received SIGINT, exiting immediately");
Metrics::get().service.info.set_label(ServiceInfo::terminating());
bail!("interrupted");
}
_ = terminate.recv() => {
warn!("received SIGTERM, shutting down once all existing connections have closed");
Metrics::get().service.info.set_label(ServiceInfo::terminating());
token.cancel();
}
}

View File

@@ -981,6 +981,7 @@ impl Reconciler {
));
}
let mut first_err = None;
for (node, conf) in changes {
if self.cancel.is_cancelled() {
return Err(ReconcileError::Cancel);
@@ -990,7 +991,12 @@ impl Reconciler {
// shard _available_ (the attached location), and configuring secondary locations
// can be done lazily when the node becomes available (via background reconciliation).
if node.is_available() {
self.location_config(&node, conf, None, false).await?;
let res = self.location_config(&node, conf, None, false).await;
if let Err(err) = res {
if first_err.is_none() {
first_err = Some(err);
}
}
} else {
// If the node is unavailable, we skip and consider the reconciliation successful: this
// is a common case where a pageserver is marked unavailable: we demote a location on
@@ -1002,6 +1008,10 @@ impl Reconciler {
}
}
if let Some(err) = first_err {
return Err(err);
}
// The condition below identifies a detach. We must have no attached intent and
// must have been attached to something previously. Pass this information to
// the [`ComputeHook`] such that it can update its tenant-wide state.

View File

@@ -1530,10 +1530,19 @@ impl Service {
// so that waiters will see the correct error after waiting.
tenant.set_last_error(result.sequence, e);
// Skip deletions on reconcile failures
let upsert_deltas =
deltas.filter(|delta| matches!(delta, ObservedStateDelta::Upsert(_)));
tenant.apply_observed_deltas(upsert_deltas);
// If the reconciliation failed, don't clear the observed state for places where we
// detached. Instead, mark the observed state as uncertain.
let failed_reconcile_deltas = deltas.map(|delta| {
if let ObservedStateDelta::Delete(node_id) = delta {
ObservedStateDelta::Upsert(Box::new((
node_id,
ObservedStateLocation { conf: None },
)))
} else {
delta
}
});
tenant.apply_observed_deltas(failed_reconcile_deltas);
}
}

View File

@@ -16,6 +16,7 @@ from typing_extensions import override
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
PgBin,
PgProtocol,
@@ -129,6 +130,10 @@ class NeonCompare(PgCompare):
# Start pg
self._pg = self.env.endpoints.create_start("main", "main", self.tenant)
@property
def endpoint(self) -> Endpoint:
return self._pg
@property
@override
def pg(self) -> PgProtocol:

View File

@@ -79,18 +79,28 @@ class EndpointHttpClient(requests.Session):
return json
def prewarm_lfc(self, from_endpoint_id: str | None = None):
"""
Prewarm LFC cache from given endpoint and wait till it finishes or errors
"""
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(self.prewarm_url, params=params).raise_for_status()
self.prewarm_lfc_wait()
def prewarm_lfc_wait(self):
"""
Wait till LFC prewarm returns with error or success.
If prewarm was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped"
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status in ["failed", "completed", "skipped"], f"{status}, {err=}"
assert status in statuses, f"{status}, {err=}"
wait_until(prewarmed, timeout=60)
assert self.prewarm_lfc_status()["status"] != "failed"
res = self.prewarm_lfc_status()
assert res["status"] != "failed", res
def offload_lfc_status(self) -> dict[str, str]:
res = self.get(self.offload_url)
@@ -99,17 +109,26 @@ class EndpointHttpClient(requests.Session):
return json
def offload_lfc(self):
"""
Offload LFC cache to endpoint storage and wait till offload finishes or errors
"""
self.post(self.offload_url).raise_for_status()
self.offload_lfc_wait()
def offload_lfc_wait(self):
"""
Wait till LFC offload returns with error or success.
If offload was not requested before calling this function, it will error
"""
def offloaded():
json = self.offload_lfc_status()
status, err = json["status"], json.get("error")
assert status in ["failed", "completed"], f"{status}, {err=}"
wait_until(offloaded)
assert self.offload_lfc_status()["status"] != "failed"
wait_until(offloaded, timeout=60)
res = self.offload_lfc_status()
assert res["status"] != "failed", res
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import re
import time
from typing import TYPE_CHECKING, cast, final
@@ -13,6 +14,17 @@ if TYPE_CHECKING:
from fixtures.pg_version import PgVersion
def connstr_to_env(connstr: str) -> dict[str, str]:
# postgresql://neondb_owner:npg_kuv6Rqi1cB@ep-old-silence-w26pxsvz-pooler.us-east-2.aws.neon.build/neondb?sslmode=require&channel_binding=...'
parts = re.split(r":|@|\/|\?", connstr.removeprefix("postgresql://"))
return {
"PGUSER": parts[0],
"PGPASSWORD": parts[1],
"PGHOST": parts[2],
"PGDATABASE": parts[3],
}
def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]:
return {
"PGHOST": params["host"],

View File

@@ -2,45 +2,48 @@ from __future__ import annotations
import os
import timeit
import traceback
from concurrent.futures import ThreadPoolExecutor as Exec
from pathlib import Path
from threading import Thread
from time import sleep
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, cast
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI, connection_parameters_to_env
from fixtures.neon_api import NeonAPI, connstr_to_env
from performance.test_perf_pgbench import utc_now_timestamp
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import Endpoint, PgBin
from fixtures.pg_version import PgVersion
from performance.test_perf_pgbench import utc_now_timestamp
# These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint
# compared to the endpoint which saves its LFC and prewarms using it on startup.
# compared to the endpoint which saves its LFC and prewarms using it on startup
def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
pg_bin = neon_compare.pg_bin
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
ep_ordinary: Endpoint = neon_compare.endpoint
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed")
for ep in [ep_normal, ep_prewarmed]:
for ep in [ep_ordinary, ep_prewarmed]:
connstr: str = ep.connstr()
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"])
ep.safe_psql("CREATE EXTENSION neon")
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
ep.safe_psql("CREATE SCHEMA neon; CREATE EXTENSION neon WITH SCHEMA neon")
if ep == ep_prewarmed:
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start(autoprewarm=True)
client.prewarm_lfc_wait()
else:
ep.stop()
ep.start()
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
@@ -59,6 +62,36 @@ def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare):
neon_compare.zenbenchmark.record_pg_bench_result(name, res)
def test_compare_prewarmed_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("prewarmed")
ep_ordinary: Endpoint = neon_compare.endpoint
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed")
sql = [
"CREATE SCHEMA neon",
"CREATE EXTENSION neon WITH SCHEMA neon",
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
sql_check = "SELECT count(*) from foo"
ep_ordinary.safe_psql_many(sql)
ep_ordinary.stop()
ep_ordinary.start()
with neon_compare.record_duration("ordinary_run_duration"):
ep_ordinary.safe_psql(sql_check)
ep_prewarmed.safe_psql_many(sql)
client = ep_prewarmed.http_client()
client.offload_lfc()
ep_prewarmed.stop()
ep_prewarmed.start(autoprewarm=True)
client.prewarm_lfc_wait()
with neon_compare.record_duration("prewarmed_run_duration"):
ep_prewarmed.safe_psql(sql_check)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_compare_prewarmed_pgbench_perf_benchmark(
@@ -67,67 +100,66 @@ def test_compare_prewarmed_pgbench_perf_benchmark(
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
project = neon_api.create_project(pg_version, name)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
err = False
try:
benchmark_impl(pg_bin, neon_api, project, zenbenchmark)
except Exception as e:
err = True
log.error(f"Caught exception: {e}")
log.error(traceback.format_exc())
finally:
assert not err
neon_api.delete_project(project_id)
"""
Prewarm API is not public, so this test relies on a pre-created project
with pgbench size of 3424, pgbench -i -IdtGvp -s3424. Sleeping and
offloading constants are hardcoded to this size as well
"""
project_id = os.getenv("PROJECT_ID")
assert project_id
ordinary_branch_id = ""
prewarmed_branch_id = ""
for branch in neon_api.get_branches(project_id)["branches"]:
if branch["name"] == "ordinary":
ordinary_branch_id = branch["id"]
if branch["name"] == "prewarmed":
prewarmed_branch_id = branch["id"]
assert len(ordinary_branch_id) > 0
assert len(prewarmed_branch_id) > 0
ep_ordinary = None
ep_prewarmed = None
for ep in neon_api.get_endpoints(project_id)["endpoints"]:
if ep["branch_id"] == ordinary_branch_id:
ep_ordinary = ep
if ep["branch_id"] == prewarmed_branch_id:
ep_prewarmed = ep
assert ep_ordinary
assert ep_prewarmed
ordinary_id = ep_ordinary["id"]
prewarmed_id = ep_prewarmed["id"]
def benchmark_impl(
pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker
):
pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB
offload_secs = 20
test_duration_min = 5
test_duration_min = 3
pgbench_duration = f"-T{test_duration_min * 60}"
# prewarm API is not publicly exposed. In order to test performance of a
# fully prewarmed endpoint, wait after it restarts.
# The number here is empirical, based on manual runs on staging
pgbench_init_cmd = ["pgbench", "-P10", "-n", "-c10", pgbench_duration, "-Mprepared"]
pgbench_perf_cmd = pgbench_init_cmd + ["-S"]
prewarmed_sleep_secs = 180
branch_id = project["branch"]["id"]
project_id = project["project"]["id"]
normal_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
normal_id = project["endpoints"][0]["id"]
prewarmed_branch_id = neon_api.create_branch(
project_id, "prewarmed", parent_id=branch_id, add_endpoint=False
)["branch"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
ep_prewarmed = neon_api.create_endpoint(
project_id,
prewarmed_branch_id,
endpoint_type="read_write",
settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs},
)
neon_api.wait_for_operation_to_finish(project_id)
prewarmed_env = normal_env.copy()
prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"]
prewarmed_id = ep_prewarmed["endpoint"]["id"]
ordinary_uri = neon_api.get_connection_uri(project_id, ordinary_branch_id, ordinary_id)["uri"]
prewarmed_uri = neon_api.get_connection_uri(project_id, prewarmed_branch_id, prewarmed_id)[
"uri"
]
def bench(endpoint_name, endpoint_id, env):
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env)
sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes
neon_api.restart_endpoint(project_id, endpoint_id)
sleep(prewarmed_sleep_secs)
log.info(f"Running pgbench for {pgbench_duration}s to warm up the cache")
pg_bin.run_capture(pgbench_init_cmd, env) # capture useful for debugging
log.info(f"Initialized {endpoint_name}")
if endpoint_name == "prewarmed":
log.info(f"sleeping {offload_secs * 2} to ensure LFC is offloaded")
sleep(offload_secs * 2)
neon_api.restart_endpoint(project_id, endpoint_id)
log.info(f"sleeping {prewarmed_sleep_secs} to ensure LFC is prewarmed")
sleep(prewarmed_sleep_secs)
else:
neon_api.restart_endpoint(project_id, endpoint_id)
log.info(f"Starting benchmark for {endpoint_name}")
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env)
out = pg_bin.run_capture(pgbench_perf_cmd, env)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
@@ -140,29 +172,9 @@ def benchmark_impl(
)
zenbenchmark.record_pg_bench_result(endpoint_name, res)
with Exec(max_workers=2) as exe:
exe.submit(bench, "normal", normal_id, normal_env)
exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env)
prewarmed_args = ("prewarmed", prewarmed_id, connstr_to_env(prewarmed_uri))
prewarmed_thread = Thread(target=bench, args=prewarmed_args)
prewarmed_thread.start()
def test_compare_prewarmed_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
sql = [
"CREATE EXTENSION neon",
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
for ep in [ep_normal, ep_prewarmed]:
ep.safe_psql_many(sql)
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
with neon_compare.record_duration(f"{ep.branch_name}_run_duration"):
ep.safe_psql("SELECT count(*) from foo")
bench("ordinary", ordinary_id, connstr_to_env(ordinary_uri))
prewarmed_thread.join()

View File

@@ -3309,6 +3309,7 @@ def test_ps_unavailable_after_delete(
ps.allowed_errors.append(".*request was dropped before completing.*")
env.storage_controller.node_delete(ps.id, force=True)
wait_until(lambda: assert_nodes_count(2))
env.storage_controller.reconcile_until_idle()
elif deletion_api == DeletionAPIKind.OLD:
env.storage_controller.node_delete_old(ps.id)
assert_nodes_count(2)
@@ -4959,3 +4960,49 @@ def test_storage_controller_forward_404(neon_env_builder: NeonEnvBuilder):
env.storage_controller.configure_failpoints(
("reconciler-live-migrate-post-generation-inc", "off")
)
def test_re_attach_with_stuck_secondary(neon_env_builder: NeonEnvBuilder):
"""
This test assumes that the secondary location cannot be configured for whatever reason.
It then attempts to detach and and attach the tenant back again and, finally, checks
for observed state consistency by attempting to create a timeline.
See LKB-204 for more details.
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
env.storage_controller.allowed_errors.append(".*failpoint.*")
tenant_id, _ = env.create_tenant(shard_count=1, placement_policy='{"Attached":1}')
env.storage_controller.reconcile_until_idle()
locations = env.storage_controller.locate(tenant_id)
assert len(locations) == 1
primary: int = locations[0]["node_id"]
not_primary = [ps.id for ps in env.pageservers if ps.id != primary]
assert len(not_primary) == 1
secondary = not_primary[0]
env.get_pageserver(secondary).http_client().configure_failpoints(
("put-location-conf-handler", "return(1)")
)
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
with pytest.raises(Exception, match="failpoint"):
env.storage_controller.reconcile_all()
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 1}})
with pytest.raises(Exception, match="failpoint"):
env.storage_controller.reconcile_all()
env.storage_controller.pageserver_api().timeline_create(
pg_version=PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=TimelineId.generate()
)

View File

@@ -28,6 +28,8 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
const-oid = { version = "0.9", default-features = false, features = ["db", "std"] }
crossbeam-epoch = { version = "0.9" }
crossbeam-utils = { version = "0.8" }
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
der = { version = "0.7", default-features = false, features = ["derive", "flagset", "oid", "pem", "std"] }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
@@ -73,6 +75,7 @@ num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
portable-atomic = { version = "1", features = ["require-cas"] }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
rand = { version = "0.9" }
regex = { version = "1" }