Compare commits

..

7 Commits

Author SHA1 Message Date
BodoBolero
6f913f2068 fix to use lakebase access token 2025-07-30 17:09:36 +02:00
BodoBolero
bb19dc96f3 change oidc host 2025-07-30 15:10:58 +02:00
BodoBolero
073d46ab80 run workflow before merged into main 2025-07-30 15:05:32 +02:00
BodoBolero
cc26e24bd8 Trigger GitHub to register workflow 2025-07-30 15:03:19 +02:00
BodoBolero
3c8d67ae44 try first version of benchmark on lakebase 2025-07-30 14:44:22 +02:00
BodoBolero
a6cb8a7f72 add project delete and bearer token actions 2025-07-29 17:03:17 +02:00
BodoBolero
01c7de98a2 first draft of project create for lakebase 2025-07-28 16:59:08 +02:00
31 changed files with 433 additions and 450 deletions

View File

@@ -27,6 +27,9 @@ config-variables:
- HETZNER_CACHE_BUCKET
- HETZNER_CACHE_ENDPOINT
- HETZNER_CACHE_REGION
- LAKEBASE_API_HOST
- LAKEBASE_OAUTH_CLIENT_ID
- LAKEBASE_ORG_ID
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID

View File

@@ -0,0 +1,126 @@
name: 'Create Lakebase Project'
description: 'Create Lakebase Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/lakebase-console'
required: true
postgres_version:
description: 'Postgres version; default is 16'
default: '17'
compute_units:
description: '[Min, Max] compute units'
default: '[1, 1]'
psql_path:
description: 'Path to psql binary - it is caller responsibility to provision the psql binary'
required: false
default: '/tmp/neon/pg_install/v16/bin/psql'
libpq_lib_path:
description: 'Path to directory containing libpq library - it is caller responsibility to provision the libpq library'
required: false
default: '/tmp/neon/pg_install/v16/lib'
project_settings:
description: 'A JSON object with project settings'
required: false
default: '{}'
fixed_hostname:
description: 'Fixed hostname to use for connection URI'
required: false
default: 'k8s-dpingres-serverle-09ade1e9e9-0d7f675c53b35938.elb.us-west-2.amazonaws.com'
region_id:
description: 'Project region ID'
required: false
default: 'aws-us-east-2'
outputs:
dsn:
description: 'Created Project DSN (for main database)'
value: ${{ steps.create-neon-project.outputs.dsn }}
project_id:
description: 'Created Project ID'
value: ${{ steps.create-neon-project.outputs.project_id }}
runs:
using: "composite"
steps:
- name: Create Lakebase Project
id: create-lakebase-project
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
res=$(curl \
"https://${API_HOST}/api/v2/projects" \
-w "%{http_code}" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}" \
--data "{
\"project\": {
\"org_id\": \"${ORG_ID}\",
\"name\": \"Created by actions/lakebase-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"pg_version\": ${POSTGRES_VERSION},
\"region_id\": \"${REGION_ID}\",
\"provisioner\": \"k8s-neonvm\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": ${PROJECT_SETTINGS}
}
}")
code=${res: -3}
if [[ ${code} -ge 400 ]]; then
echo Request failed with error code ${code}
echo ${res::-3}
exit 1
else
project=${res::-3}
fi
# Mask password
echo "::add-mask::$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .password')"
original_dsn=$(echo $project | jq --raw-output '.connection_uris[0].connection_uri')
echo "::add-mask::${original_dsn}"
# Extract endpoint ID from the original hostname
endpoint_id=$(echo "$original_dsn" | sed -n 's/.*@\(ep-[^.]*\)\..*/\1/p')
# Parse original URI components
user_pass=$(echo "$original_dsn" | sed -n 's/postgresql:\/\/\([^@]*\)@.*/\1/p')
database=$(echo "$original_dsn" | sed -n 's/.*\/\([^?]*\).*/\1/p')
# Construct the corrected DSN with fixed hostname and endpoint in options
if [[ "$original_dsn" == *"?"* ]]; then
# Extract existing query parameters
existing_params=$(echo "$original_dsn" | sed -n 's/.*?\(.*\)/\1/p')
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?${existing_params}&options=endpoint%3d${endpoint_id}"
else
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?options=endpoint%3d${endpoint_id}"
fi
echo "::add-mask::${dsn}"
echo "dsn=${dsn}" >> $GITHUB_OUTPUT
project_id=$(echo $project | jq --raw-output '.project.id')
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
echo "Project ${project_id} has been created"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
REGION_ID: ${{ inputs.region_id }}
POSTGRES_VERSION: ${{ inputs.postgres_version }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
PSQL: ${{ inputs.psql_path }}
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}
PROJECT_SETTINGS: ${{ inputs.project_settings }}
FIXED_HOSTNAME: ${{ inputs.fixed_hostname }}

View File

@@ -0,0 +1,39 @@
name: 'Delete Neon Project'
description: 'Delete Neon Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/ajax-api/2.0/lakebase-console'
required: true
project_id:
description: 'ID of the Project to delete'
required: true
runs:
using: "composite"
steps:
- name: Delete Lakebase Project
# Do not try to delete a project if .github/actions/neon-project-create failed before
if: ${{ inputs.project_id != '' }}
shell: bash -euxo pipefail {0}
run: |
curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}" \
--fail \
--request DELETE \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}"
echo "Project ${PROJECT_ID} has been deleted"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
PROJECT_ID: ${{ inputs.project_id }}

View File

@@ -0,0 +1,166 @@
name: Lakebase Benchmarking
on:
# uncomment to run on push for debugging your PR
push:
branches: [ bodobolero/lakebase_perf_tests ]
workflow_dispatch: # adds ability to run this manually
inputs:
postgres_version:
description: 'Postgres version'
required: false
default: '17'
save_perf_report:
type: boolean
description: 'Publish perf report'
required: false
default: false
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
lakebase-pgbench:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: ${{ github.event.inputs.postgres_version || '17' }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || false }}
PLATFORM: "lakebase-captest-new"
# TODO: for lakehouse test-shard which is probably deployed in US-West we need to change the runner
# to us-west to get correct OLTP latencies due to added speed of light latency
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
# Increase timeout to 8h, default timeout is 6h
timeout-minutes: 480
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
## TODO, currently we cannot really specify a small min max CU for lakebase project
## and the semantic of a CU is different from Neon, so we need to carefully map
## compute sizes before comparing results
- name: Create Lakebase Project
id: create-lakebase-project
uses: ./.github/actions/lakebase-project-create
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
postgres_version: ${{ env.PG_VERSION }}
compute_units: '[1, 1]'
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark simple-update
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark select-only
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Delete Lakebase Project
if: ${{ steps.create-lakebase-project.outputs.project_id && always() }}
uses: ./.github/actions/lakebase-project-delete
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
project_id: ${{ steps.create-lakebase-project.outputs.project_id }}
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Lakebase perf testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -48,20 +48,8 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
generate-ch-tmppw:
runs-on: ubuntu-22.04
outputs:
tmp_val: ${{ steps.pwgen.outputs.tmp_val }}
steps:
- name: Generate a random password
id: pwgen
run: |
set +x
p=$(dd if=/dev/random bs=14 count=1 2>/dev/null | base64)
echo tmp_val="${p//\//}" >> "${GITHUB_OUTPUT}"
test-logical-replication:
needs: [ build-build-tools-image, generate-ch-tmppw ]
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04
container:
@@ -72,20 +60,16 @@ jobs:
options: --init --user root
services:
clickhouse:
image: clickhouse/clickhouse-server:24.8
env:
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
image: clickhouse/clickhouse-server:24.6.3.64
ports:
- 9000:9000
- 8123:8123
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
image: quay.io/debezium/zookeeper:2.7
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
image: quay.io/debezium/kafka:2.7
env:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
@@ -95,7 +79,7 @@ jobs:
ports:
- 9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
image: quay.io/debezium/connect:2.7
env:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
@@ -141,7 +125,6 @@ jobs:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
- name: Delete Neon Project
if: always()

View File

@@ -52,7 +52,7 @@ pub(crate) fn regenerate(
};
// Express a static value for how many shards we may schedule on one node
const MAX_SHARDS: u32 = 2500;
const MAX_SHARDS: u32 = 5000;
let mut doc = PageserverUtilization {
disk_usage_bytes: used,

View File

@@ -48,8 +48,6 @@ DATA = \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.6.sql \
neon--1.6--1.7.sql \
neon--1.7--1.6.sql \
neon--1.6--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \

View File

@@ -79,6 +79,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
##__VA_ARGS__)
@@ -260,7 +264,7 @@ typedef struct PrefetchState
/* the buffers */
prfh_hash *prf_hash;
int max_unflushed_shard_no;
int max_shard_no;
/* Mark shards involved in prefetch */
uint8 shard_bitmap[(MAX_SHARDS + 7)/8];
PrefetchRequest prf_buffer[]; /* prefetch buffers */
@@ -300,7 +304,6 @@ static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_
static bool prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
static bool prefetch_flush_requests(void);
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
PrefetchRequest *slot);
@@ -470,26 +473,13 @@ communicator_prefetch_pump_state(void)
{
START_PREFETCH_RECEIVE_WORK();
if (MyPState->ring_receive == MyPState->ring_flush && MyPState->ring_flush < MyPState->ring_unused)
{
/*
* Flush request to avoid requests pending for arbitrary long time,
* pinning LSN and holding GC at PS.
*/
if (!prefetch_flush_requests())
{
END_PREFETCH_RECEIVE_WORK();
return;
}
}
while (MyPState->ring_receive != MyPState->ring_flush)
{
NeonResponse *response;
PrefetchRequest *slot;
MemoryContext old;
uint64 my_ring_index = MyPState->ring_receive;
slot = GetPrfSlot(my_ring_index);
slot = GetPrfSlot(MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = page_server->try_receive(slot->shard_no);
@@ -503,12 +493,12 @@ communicator_prefetch_pump_state(void)
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != my_ring_index)
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
"Incorrect prefetch slot state after receive: status=%d response=%p my=" UINT64_FORMAT " receive=" UINT64_FORMAT "",
slot->status, slot->response,
slot->my_ring_index, my_ring_index);
slot->my_ring_index, MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -536,19 +526,6 @@ communicator_prefetch_pump_state(void)
END_PREFETCH_RECEIVE_WORK();
if (RecoveryInProgress())
{
/*
* Update backend's min in-flight prefetch LSN.
*/
XLogRecPtr min_backend_prefetch_lsn = last_replay_lsn != InvalidXLogRecPtr ? last_replay_lsn : GetXLogReplayRecPtr(NULL);
for (uint64_t ring_index = MyPState->ring_receive; ring_index < MyPState->ring_unused; ring_index++)
{
PrefetchRequest* slot = GetPrfSlot(ring_index);
min_backend_prefetch_lsn = Min(slot->request_lsns.request_lsn, min_backend_prefetch_lsn);
}
MIN_BACKEND_REQUEST_LSN = min_backend_prefetch_lsn;
}
communicator_reconfigure_timeout_if_needed();
}
@@ -588,7 +565,7 @@ readahead_buffer_resize(int newsize, void *extra)
newPState->ring_last = newsize;
newPState->ring_unused = newsize;
newPState->ring_receive = newsize;
newPState->max_unflushed_shard_no = MyPState->max_unflushed_shard_no;
newPState->max_shard_no = MyPState->max_shard_no;
memcpy(newPState->shard_bitmap, MyPState->shard_bitmap, sizeof(MyPState->shard_bitmap));
/*
@@ -688,7 +665,6 @@ consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
/*
* We know for sure we're not working on any prefetch pages after
* this.
@@ -718,7 +694,7 @@ prefetch_cleanup_trailing_unused(void)
static bool
prefetch_flush_requests(void)
{
for (shardno_t shard_no = 0; shard_no < MyPState->max_unflushed_shard_no; shard_no++)
for (shardno_t shard_no = 0; shard_no < MyPState->max_shard_no; shard_no++)
{
if (BITMAP_ISSET(MyPState->shard_bitmap, shard_no))
{
@@ -727,8 +703,7 @@ prefetch_flush_requests(void)
BITMAP_CLR(MyPState->shard_bitmap, shard_no);
}
}
MyPState->max_unflushed_shard_no = 0;
MyPState->ring_flush = MyPState->ring_unused;
MyPState->max_shard_no = 0;
return true;
}
@@ -752,6 +727,7 @@ prefetch_wait_for(uint64 ring_index)
{
if (!prefetch_flush_requests())
return false;
MyPState->ring_flush = MyPState->ring_unused;
}
Assert(MyPState->ring_unused > ring_index);
@@ -830,7 +806,6 @@ prefetch_read(PrefetchRequest *slot)
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
if (response)
{
check_getpage_response(slot, response);
@@ -1039,16 +1014,11 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
if (force_request_lsns)
{
slot->request_lsns = *force_request_lsns;
}
else
{
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum, slot->buftag.blockNum,
&slot->request_lsns, 1);
last_replay_lsn = InvalidXLogRecPtr;
}
request.hdr.lsn = slot->request_lsns.request_lsn;
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
@@ -1067,7 +1037,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
BITMAP_SET(MyPState->shard_bitmap, slot->shard_no);
MyPState->max_unflushed_shard_no = Max(slot->shard_no+1, MyPState->max_unflushed_shard_no);
MyPState->max_shard_no = Max(slot->shard_no+1, MyPState->max_shard_no);
/* update slot state */
slot->status = PRFS_REQUESTED;
@@ -1075,25 +1045,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Check that returned page LSN is consistent with request lsns
*/
static void
check_page_lsn(NeonGetPageResponse* resp)
{
if (neon_protocol_version < 3) /* no information to check */
return;
if (PageGetLSN(resp->page) > resp->req.hdr.not_modified_since)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than last modified LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.not_modified_since));
if (PageGetLSN(resp->page) > resp->req.hdr.lsn)
neon_log(PANIC, "Invalid getpage response version: %X/%08X is higher than request LSN %X/%08X",
LSN_FORMAT_ARGS(PageGetLSN(resp->page)),
LSN_FORMAT_ARGS(resp->req.hdr.lsn));
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
@@ -1117,7 +1068,7 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
NeonGetPageResponse* resp;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
@@ -1150,9 +1101,8 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
resp = (NeonGetPageResponse*)slot->response;
check_page_lsn(resp);
memcpy(buffers[i], resp->page, BLCKSZ);
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
/*
* With lfc_store_prefetch_result=true prefetch result is stored in LFC in prefetch_pump_state when response is received
@@ -1445,6 +1395,7 @@ Retry:
*/
goto Retry;
}
MyPState->ring_flush = MyPState->ring_unused;
}
return last_ring_index;
@@ -1514,12 +1465,10 @@ page_server_request(void const *req)
MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
last_replay_lsn = InvalidXLogRecPtr;
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
last_replay_lsn = InvalidXLogRecPtr;
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
@@ -1919,13 +1868,6 @@ nm_to_string(NeonMessage *msg)
return s.data;
}
static void
reset_min_request_lsn(int code, Datum arg)
{
if (MyProcNumber != -1)
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
}
/*
* communicator_init() -- Initialize per-backend private state
*/
@@ -1937,8 +1879,6 @@ communicator_init(void)
if (MyPState != NULL)
return;
before_shmem_exit(reset_min_request_lsn, 0);
/*
* Sanity check that theperf counters array is sized correctly. We got
* this wrong once, and the formula for max number of backends and aux
@@ -1948,7 +1888,7 @@ communicator_init(void)
* the check here. That's OK, we don't expect the logic to change in old
* releases.
*/
#if PG_MAJORVERSION_NUM >= 15
#if PG_VERSION_NUM>=150000
if (MyNeonCounters >= &neon_per_backend_counters_shared[NUM_NEON_PERF_COUNTER_SLOTS])
elog(ERROR, "MyNeonCounters points past end of array");
#endif
@@ -2287,7 +2227,6 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
check_page_lsn(getpage_resp);
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2564,30 +2503,12 @@ communicator_reconfigure_timeout_if_needed(void)
!AmPrewarmWorker && /* do not pump prefetch state in prewarm worker */
readahead_getpage_pull_timeout_ms > 0;
if (!needs_set && MIN_BACKEND_REQUEST_LSN != InvalidXLogRecPtr)
{
if (last_replay_lsn == InvalidXLogRecPtr)
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
else
needs_set = true; /* Can not reset MIN_BACKEND_REQUEST_LSN now, have to do it later */
}
if (needs_set != timeout_set)
{
/*
* The background writer/checkpointer doens't (shouldn't) read any pages.
* And definitely they should not run on replica.
* The only case when we can get here is replica promotion.
*/
if (AmBackgroundWriterProcess() || AmCheckpointerProcess())
{
MIN_BACKEND_REQUEST_LSN = InvalidXLogRecPtr;
if (timeout_set)
{
disable_timeout(PS_TIMEOUT_ID, false);
timeout_set = false;
}
return;
}
/* The background writer doens't (shouldn't) read any pages */
Assert(!AmBackgroundWriterProcess());
/* The checkpointer doens't (shouldn't) read any pages */
Assert(!AmCheckpointerProcess());
if (unlikely(PS_TIMEOUT_ID == 0))
{
@@ -2620,6 +2541,14 @@ communicator_reconfigure_timeout_if_needed(void)
static void
pagestore_timeout_handler(void)
{
#if PG_MAJORVERSION_NUM <= 14
/*
* PG14: Setting a repeating timeout is not possible, so we signal here
* that the timeout has already been reset, and by telling the system
* that system will re-schedule it later if we need to.
*/
timeout_set = false;
#endif
timeout_signaled = true;
InterruptPending = true;
}
@@ -2639,14 +2568,6 @@ communicator_processinterrupts(void)
if (!readpage_reentrant_guard && readahead_getpage_pull_timeout_ms > 0)
communicator_prefetch_pump_state();
#if PG_MAJORVERSION_NUM <= 14
/*
* PG14: Setting a repeating timeout is not possible, so we signal here
* that the timeout has already been reset, and by telling the system
* that system will re-schedule it later if we need to.
*/
timeout_set = false;
#endif
timeout_signaled = false;
communicator_reconfigure_timeout_if_needed();
}
@@ -2656,28 +2577,3 @@ communicator_processinterrupts(void)
return prev_interrupt_cb();
}
PG_FUNCTION_INFO_V1(neon_communicator_min_inflight_request_lsn);
Datum
neon_communicator_min_inflight_request_lsn(PG_FUNCTION_ARGS)
{
if (RecoveryInProgress())
{
/* Do not hold GC for primary */
PG_RETURN_INT64(UINT64_MAX);
}
else
{
XLogRecPtr min_lsn = GetXLogReplayRecPtr(NULL);
size_t n_procs = ProcGlobal->allProcCount;
for (size_t i = 0; i < n_procs; i++)
{
if (neon_per_backend_counters_shared[i].min_request_lsn != InvalidXLogRecPtr)
{
min_lsn = Min(min_lsn, neon_per_backend_counters_shared[i].min_request_lsn);
}
}
PG_RETURN_INT64(min_lsn);
}
}

View File

@@ -635,11 +635,6 @@ lfc_init(void)
NULL);
}
/*
* Dump a list of pages that are currently in the LFC
*
* This is used to get a snapshot that can be used to prewarm the LFC later.
*/
FileCacheState*
lfc_get_state(size_t max_entries)
{
@@ -2272,3 +2267,4 @@ get_prewarm_info(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -1,3 +0,0 @@
create function neon_communicator_min_inflight_request_lsn() returns pg_catalog.pg_lsn
AS 'MODULE_PATHNAME', 'neon_communicator_min_inflight_request_lsn'
LANGUAGE C;

View File

@@ -1 +0,0 @@
drop function neon_communicator_min_inflight_request_lsn();

View File

@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* neon.c
* Main entry point into the neon extension
* Main entry point into the neon exension
*
*-------------------------------------------------------------------------
*/
@@ -508,7 +508,7 @@ _PG_init(void)
DefineCustomBoolVariable(
"neon.disable_logical_replication_subscribers",
"Disable incoming logical replication",
"Disables incomming logical replication",
NULL,
&disable_logical_replication_subscribers,
false,
@@ -567,7 +567,7 @@ _PG_init(void)
DefineCustomEnumVariable(
"neon.debug_compare_local",
"Debug mode for comparing content of pages in prefetch ring/LFC/PS and local disk",
"Debug mode for compaing content of pages in prefetch ring/LFC/PS and local disk",
NULL,
&debug_compare_local,
DEBUG_COMPARE_LOCAL_NONE,
@@ -735,6 +735,7 @@ neon_shmem_request_hook(void)
static void
neon_shmem_startup_hook(void)
{
/* Initialize */
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();

View File

@@ -42,6 +42,7 @@ NeonPerfCountersShmemRequest(void)
}
void
NeonPerfCountersShmemInit(void)
{

View File

@@ -154,11 +154,6 @@ typedef struct
* Histogram of query execution time.
*/
QTHistogramData query_time_hist;
/*
* Minimal LSN of in-fligth request requests
*/
XLogRecPtr min_request_lsn;
} neon_per_backend_counters;
/* Pointer to the shared memory array of neon_per_backend_counters structs */
@@ -172,13 +167,11 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
*/
#define NUM_NEON_PERF_COUNTER_SLOTS (MaxBackends + NUM_AUXILIARY_PROCS)
#if PG_VERSION_NUM >= 170000
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
/*
* Backend-local minimal in-flight request LSN.
* We store it in neon_per_backend_counters_shared and not in separate array to minimize false cache sharing
*/
#define MIN_BACKEND_REQUEST_LSN MyNeonCounters->min_request_lsn
#else
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProc->pgprocno])
#endif
extern void inc_getpage_wait(uint64 latency);
extern void inc_page_cache_read_wait(uint64 latency);

View File

@@ -9,10 +9,6 @@
#include "fmgr.h"
#include "storage/buf_internals.h"
#if PG_MAJORVERSION_NUM < 16
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#if PG_MAJORVERSION_NUM < 17
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId)
#else
@@ -162,10 +158,6 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define AmAutoVacuumWorkerProcess() (IsAutoVacuumWorkerProcess())
#endif
#if PG_MAJORVERSION_NUM < 17
#define MyProcNumber (MyProc - &ProcGlobal->allProcs[0])
#endif
#if PG_MAJORVERSION_NUM < 15
extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
extern TimeLineID GetWALInsertionTimeLine(void);

View File

@@ -243,7 +243,6 @@ extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern XLogRecPtr last_replay_lsn;
extern shardno_t get_shard_number(BufferTag* tag);

View File

@@ -72,6 +72,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#include "access/nbtree.h"
#include "storage/bufpage.h"
#include "access/xlog_internal.h"
@@ -96,8 +100,6 @@ typedef enum
int debug_compare_local;
XLogRecPtr last_replay_lsn;
static NRelFileInfo unlogged_build_rel_info;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
@@ -161,7 +163,7 @@ log_newpages_copy(NRelFileInfo * rinfo, ForkNumber forkNum, BlockNumber blkno,
page_std);
}
return GetXLogInsertRecPtr();
return ProcLastRecPtr;
}
#endif /* PG_MAJORVERSION_NUM >= 17 */
@@ -590,17 +592,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
/* Request the page at the end of the last fully replayed LSN. */
XLogRecPtr replay_lsn = GetXLogReplayRecPtr(NULL);
if (MIN_BACKEND_REQUEST_LSN == InvalidXLogRecPtr)
{
/* mark the backend's replay_lsn as "we have a request ongoing", blocking the expiration of any current LSN */
MIN_BACKEND_REQUEST_LSN = replay_lsn;
/* make sure memory operations are in correct order, even in concurrent systems */
pg_memory_barrier();
/* get the current LSN to register */
replay_lsn = GetXLogReplayRecPtr(NULL);
MIN_BACKEND_REQUEST_LSN = replay_lsn;
}
last_replay_lsn = replay_lsn;
for (int i = 0; i < nblocks; i++)
{
neon_request_lsns *result = &output[i];

View File

@@ -13,7 +13,6 @@
#include "neon.h"
#include "neon_pgversioncompat.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include RELFILEINFO_HDR
#include "storage/smgr.h"
@@ -24,6 +23,10 @@
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "miscadmin.h"
#endif
typedef struct
{
NRelFileInfo rinfo;

View File

@@ -535,7 +535,12 @@ pub async fn run() -> anyhow::Result<()> {
// add a task to flush the db_schema cache every 10 minutes
#[cfg(feature = "rest_broker")]
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
maintenance_tasks.spawn(db_schema_cache.maintain());
maintenance_tasks.spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(600)).await;
db_schema_cache.0.run_pending_tasks();
}
});
}
if let Some(metrics_config) = &config.metric_collection {

View File

@@ -2,12 +2,8 @@ use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant};
use moka::Expiry;
use moka::notification::RemovalCause;
use crate::control_plane::messages::ControlPlaneErrorMessage;
use crate::metrics::{
CacheEviction, CacheKind, CacheOutcome, CacheOutcomeGroup, CacheRemovalCause, Metrics,
};
/// Default TTL used when caching errors from control plane.
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
@@ -134,35 +130,3 @@ impl<K, V> Expiry<K, ControlPlaneResult<V>> for CplaneExpiry {
self.expire_early(value, updated_at)
}
}
pub fn eviction_listener(kind: CacheKind, cause: RemovalCause) {
let cause = match cause {
RemovalCause::Expired => CacheRemovalCause::Expired,
RemovalCause::Explicit => CacheRemovalCause::Explicit,
RemovalCause::Replaced => CacheRemovalCause::Replaced,
RemovalCause::Size => CacheRemovalCause::Size,
};
Metrics::get()
.cache
.evicted_total
.inc(CacheEviction { cache: kind, cause });
}
#[inline]
pub fn count_cache_outcome<T>(kind: CacheKind, cache_result: Option<T>) -> Option<T> {
let outcome = if cache_result.is_some() {
CacheOutcome::Hit
} else {
CacheOutcome::Miss
};
Metrics::get().cache.request_total.inc(CacheOutcomeGroup {
cache: kind,
outcome,
});
cache_result
}
#[inline]
pub fn count_cache_insert(kind: CacheKind) {
Metrics::get().cache.inserted_total.inc(kind);
}

View File

@@ -1,8 +1,7 @@
use crate::cache::common::{Cache, count_cache_insert, count_cache_outcome, eviction_listener};
use crate::cache::common::Cache;
use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry};
use crate::config::CacheOptions;
use crate::control_plane::NodeInfo;
use crate::metrics::{CacheKind, Metrics};
use crate::types::EndpointCacheKey;
pub(crate) struct NodeInfoCache(moka::sync::Cache<EndpointCacheKey, ControlPlaneResult<NodeInfo>>);
@@ -20,30 +19,18 @@ impl Cache for NodeInfoCache {
impl NodeInfoCache {
pub fn new(config: CacheOptions) -> Self {
let builder = moka::sync::Cache::builder()
.name("node_info")
.name("node_info_cache")
.expire_after(CplaneExpiry::default());
let builder = config.moka(builder);
if let Some(size) = config.size {
Metrics::get()
.cache
.capacity
.set(CacheKind::NodeInfo, size as i64);
}
let builder = builder
.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::NodeInfo, cause));
Self(builder.build())
}
pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult<NodeInfo>) {
count_cache_insert(CacheKind::NodeInfo);
self.0.insert(key, value);
}
pub fn get(&self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
count_cache_outcome(CacheKind::NodeInfo, self.0.get(key))
pub fn get(&'static self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
self.0.get(key)
}
pub fn get_entry(

View File

@@ -5,14 +5,11 @@ use clashmap::ClashMap;
use moka::sync::Cache;
use tracing::{debug, info};
use crate::cache::common::{
ControlPlaneResult, CplaneExpiry, count_cache_insert, count_cache_outcome, eviction_listener,
};
use crate::cache::common::{ControlPlaneResult, CplaneExpiry};
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::metrics::{CacheKind, Metrics};
use crate::types::{EndpointId, RoleName};
/// Cache for project info.
@@ -85,32 +82,17 @@ impl ProjectInfoCache {
impl ProjectInfoCache {
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
Metrics::get().cache.capacity.set(
CacheKind::ProjectInfoRoles,
(config.size * config.max_roles) as i64,
);
Metrics::get()
.cache
.capacity
.set(CacheKind::ProjectInfoEndpoints, config.size as i64);
// we cache errors for 30 seconds, unless retry_at is set.
let expiry = CplaneExpiry::default();
Self {
role_controls: Cache::builder()
.name("project_info_roles")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoRoles, cause);
})
.name("role_access_controls")
.max_capacity(config.size * config.max_roles)
.time_to_live(config.ttl)
.expire_after(expiry)
.build(),
ep_controls: Cache::builder()
.name("project_info_endpoints")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoEndpoints, cause);
})
.name("endpoint_access_controls")
.max_capacity(config.size)
.time_to_live(config.ttl)
.expire_after(expiry)
@@ -129,10 +111,7 @@ impl ProjectInfoCache {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
let role_name = RoleNameInt::get(role_name)?;
count_cache_outcome(
CacheKind::ProjectInfoRoles,
self.role_controls.get(&(endpoint_id, role_name)),
)
self.role_controls.get(&(endpoint_id, role_name))
}
pub(crate) fn get_endpoint_access(
@@ -141,10 +120,7 @@ impl ProjectInfoCache {
) -> Option<ControlPlaneResult<EndpointAccessControl>> {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
count_cache_outcome(
CacheKind::ProjectInfoEndpoints,
self.ep_controls.get(&endpoint_id),
)
self.ep_controls.get(&endpoint_id)
}
pub(crate) fn insert_endpoint_access(
@@ -168,9 +144,6 @@ impl ProjectInfoCache {
"created a cache entry for endpoint access"
);
count_cache_insert(CacheKind::ProjectInfoEndpoints);
count_cache_insert(CacheKind::ProjectInfoRoles);
self.ep_controls.insert(endpoint_id, Ok(controls));
self.role_controls
.insert((endpoint_id, role_name), Ok(role_controls));
@@ -199,14 +172,10 @@ impl ProjectInfoCache {
// leave the entry alone if it's already Ok
Some(entry) if entry.value().is_ok() => moka::ops::compute::Op::Nop,
// replace the entry
_ => {
count_cache_insert(CacheKind::ProjectInfoEndpoints);
moka::ops::compute::Op::Put(Err(msg.clone()))
}
_ => moka::ops::compute::Op::Put(Err(msg.clone())),
});
}
count_cache_insert(CacheKind::ProjectInfoRoles);
self.role_controls
.insert((endpoint_id, role_name), Err(msg));
}

View File

@@ -8,8 +8,8 @@ use measured::label::{
use measured::metric::histogram::Thresholds;
use measured::metric::name::MetricName;
use measured::{
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
LabelGroup, MetricGroup,
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
MetricGroup,
};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
use tokio::time::{self, Instant};
@@ -29,9 +29,6 @@ pub struct Metrics {
#[metric(namespace = "service")]
pub service: ServiceMetrics,
#[metric(namespace = "cache")]
pub cache: CacheMetrics,
}
static SELF: OnceLock<Metrics> = OnceLock::new();
@@ -222,6 +219,13 @@ pub enum Bool {
False,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "outcome")]
pub enum CacheOutcome {
Hit,
Miss,
}
#[derive(LabelGroup)]
#[label(set = ConsoleRequestSet)]
pub struct ConsoleRequest<'a> {
@@ -700,59 +704,3 @@ pub enum ServiceState {
Running,
Terminating,
}
#[derive(MetricGroup)]
#[metric(new())]
pub struct CacheMetrics {
/// The capacity of the cache
pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
/// The total number of entries inserted into the cache
pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
/// The total number of entries removed from the cache
pub evicted_total: CounterVec<CacheEvictionSet>,
/// The total number of cache requests
pub request_total: CounterVec<CacheOutcomeSet>,
}
impl Default for CacheMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
#[label(singleton = "cache")]
pub enum CacheKind {
NodeInfo,
ProjectInfoEndpoints,
ProjectInfoRoles,
Schema,
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
pub enum CacheRemovalCause {
Expired,
Explicit,
Replaced,
Size,
}
#[derive(LabelGroup)]
#[label(set = CacheEvictionSet)]
pub struct CacheEviction {
pub cache: CacheKind,
pub cause: CacheRemovalCause,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
pub enum CacheOutcome {
Hit,
Miss,
}
#[derive(LabelGroup)]
#[label(set = CacheOutcomeSet)]
pub struct CacheOutcomeGroup {
pub cache: CacheKind,
pub outcome: CacheOutcome,
}

View File

@@ -1,6 +1,5 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use bytes::Bytes;
@@ -55,12 +54,11 @@ use super::http_util::{
};
use super::json::JsonConversionError;
use crate::auth::backend::ComputeCredentialKeys;
use crate::cache::common::{count_cache_insert, count_cache_outcome, eviction_listener};
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::read_body_with_limit;
use crate::metrics::{CacheKind, Metrics};
use crate::metrics::Metrics;
use crate::serverless::sql_over_http::HEADER_VALUE_TRUE;
use crate::types::EndpointCacheKey;
use crate::util::deserialize_json_string;
@@ -140,31 +138,15 @@ pub struct ApiConfig {
}
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
pub(crate) struct DbSchemaCache(Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
pub(crate) struct DbSchemaCache(pub Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
impl DbSchemaCache {
pub fn new(config: crate::config::CacheOptions) -> Self {
let builder = Cache::builder().name("schema");
let builder = Cache::builder().name("db_schema_cache");
let builder = config.moka(builder);
let metrics = &Metrics::get().cache;
if let Some(size) = config.size {
metrics.capacity.set(CacheKind::Schema, size as i64);
}
let builder =
builder.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::Schema, cause));
Self(builder.build())
}
pub async fn maintain(&self) -> Result<Infallible, anyhow::Error> {
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
ticker.tick().await;
self.0.run_pending_tasks();
}
}
pub async fn get_cached_or_remote(
&self,
endpoint_id: &EndpointCacheKey,
@@ -174,8 +156,7 @@ impl DbSchemaCache {
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id));
match cache_result {
match self.0.get(endpoint_id) {
Some(v) => Ok(v),
None => {
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
@@ -199,7 +180,6 @@ impl DbSchemaCache {
db_extra_search_path: None,
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value);
return Err(e);
}
@@ -208,7 +188,6 @@ impl DbSchemaCache {
}
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
}

View File

@@ -9,10 +9,9 @@
```bash
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
export CLICKHOUSE_PASSWORD=ch_password123
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k 'test_clickhouse[release-pg17]'
./scripts/pytest -m remote_cluster -k test_clickhouse
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
```
@@ -22,6 +21,6 @@ docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k 'test_debezium[release-pg17]'
./scripts/pytest -m remote_cluster -k test_debezium
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml down
```

View File

@@ -1,11 +1,9 @@
services:
clickhouse:
image: clickhouse/clickhouse-server:25.6
image: clickhouse/clickhouse-server
user: "101:101"
container_name: clickhouse
hostname: clickhouse
environment:
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-ch_password123}
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000

View File

@@ -1,28 +1,18 @@
services:
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
ports:
- 127.0.0.1:2181:2181
- 127.0.0.1:2888:2888
- 127.0.0.1:3888:3888
image: quay.io/debezium/zookeeper:2.7
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
depends_on: [zookeeper]
image: quay.io/debezium/kafka:2.7
environment:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
- 29092:29092
- 127.0.0.1:9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
depends_on: [kafka]
image: quay.io/debezium/connect:2.7
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1

View File

@@ -53,13 +53,8 @@ def test_clickhouse(remote_pg: RemotePostgres):
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
if "CLICKHOUSE_PASSWORD" not in os.environ:
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
client = clickhouse_connect.get_client(
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
)
client = clickhouse_connect.get_client(host=clickhouse_host)
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command("DROP DATABASE IF EXISTS db1_postgres")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "
f"MaterializedPostgreSQL('{conn_options['host']}', "

View File

@@ -17,7 +17,6 @@ from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
from kafka import KafkaConsumer
class DebeziumAPI:
@@ -102,13 +101,9 @@ def debezium(remote_pg: RemotePostgres):
assert len(dbz.list_connectors()) == 1
from kafka import KafkaConsumer
kafka_host = "kafka" if (os.getenv("CI", "false") == "true") else "127.0.0.1"
kafka_port = 9092 if (os.getenv("CI", "false") == "true") else 29092
log.info("Connecting to Kafka: %s:%s", kafka_host, kafka_port)
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=[f"{kafka_host}:{kafka_port}"],
bootstrap_servers=["kafka:9092"],
auto_offset_reset="earliest",
enable_auto_commit=False,
)
@@ -117,7 +112,7 @@ def debezium(remote_pg: RemotePostgres):
assert resp.status_code == 204
def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> None:
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
"""
Gets the message from Kafka and checks its validity
Arguments:
@@ -129,7 +124,6 @@ def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> No
after: a dictionary, if not None, the after field from the kafka message must
have the same values for the same keys
"""
log.info("Bootstrap servers: %s", consumer.config["bootstrap_servers"])
msg = consumer.poll()
assert msg, "Empty message"
for val in msg.values():

View File

@@ -55,7 +55,7 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.6",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.7", "1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
all_versions = ["1.6", "1.5", "1.4", "1.3", "1.2", "1.1", "1.0"]
current_version = "1.6"
for idx, begin_version in enumerate(all_versions):
for target_version in all_versions[idx + 1 :]:

View File

@@ -298,26 +298,15 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
@pytest.mark.parametrize("compaction", ["compaction_enabled", "compaction_disabled"])
def test_pageserver_metrics_removed_after_offload(
neon_env_builder: NeonEnvBuilder, compaction: str
):
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_1, _ = env.create_tenant(
conf={
# disable background compaction and GC so that we don't have leftover tasks
# after offloading.
"gc_period": "0s",
"compaction_period": "0s",
}
if compaction == "compaction_disabled"
else None
)
tenant_1, _ = env.create_tenant()
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
@@ -362,23 +351,6 @@ def test_pageserver_metrics_removed_after_offload(
state=TimelineArchivalState.ARCHIVED,
)
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
# We need to wait until all background jobs are finished before we can check the metrics.
# There're many of them: compaction, GC, etc.
wait_until(
lambda: all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_waiting_tasks")
)
and all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_running_tasks")
)
)
post_offload_samples = set(
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
)