Compare commits

..

7 Commits

Author SHA1 Message Date
BodoBolero
6f913f2068 fix to use lakebase access token 2025-07-30 17:09:36 +02:00
BodoBolero
bb19dc96f3 change oidc host 2025-07-30 15:10:58 +02:00
BodoBolero
073d46ab80 run workflow before merged into main 2025-07-30 15:05:32 +02:00
BodoBolero
cc26e24bd8 Trigger GitHub to register workflow 2025-07-30 15:03:19 +02:00
BodoBolero
3c8d67ae44 try first version of benchmark on lakebase 2025-07-30 14:44:22 +02:00
BodoBolero
a6cb8a7f72 add project delete and bearer token actions 2025-07-29 17:03:17 +02:00
BodoBolero
01c7de98a2 first draft of project create for lakebase 2025-07-28 16:59:08 +02:00
30 changed files with 424 additions and 295 deletions

View File

@@ -27,6 +27,9 @@ config-variables:
- HETZNER_CACHE_BUCKET
- HETZNER_CACHE_ENDPOINT
- HETZNER_CACHE_REGION
- LAKEBASE_API_HOST
- LAKEBASE_OAUTH_CLIENT_ID
- LAKEBASE_ORG_ID
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID

View File

@@ -0,0 +1,126 @@
name: 'Create Lakebase Project'
description: 'Create Lakebase Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/lakebase-console'
required: true
postgres_version:
description: 'Postgres version; default is 16'
default: '17'
compute_units:
description: '[Min, Max] compute units'
default: '[1, 1]'
psql_path:
description: 'Path to psql binary - it is caller responsibility to provision the psql binary'
required: false
default: '/tmp/neon/pg_install/v16/bin/psql'
libpq_lib_path:
description: 'Path to directory containing libpq library - it is caller responsibility to provision the libpq library'
required: false
default: '/tmp/neon/pg_install/v16/lib'
project_settings:
description: 'A JSON object with project settings'
required: false
default: '{}'
fixed_hostname:
description: 'Fixed hostname to use for connection URI'
required: false
default: 'k8s-dpingres-serverle-09ade1e9e9-0d7f675c53b35938.elb.us-west-2.amazonaws.com'
region_id:
description: 'Project region ID'
required: false
default: 'aws-us-east-2'
outputs:
dsn:
description: 'Created Project DSN (for main database)'
value: ${{ steps.create-neon-project.outputs.dsn }}
project_id:
description: 'Created Project ID'
value: ${{ steps.create-neon-project.outputs.project_id }}
runs:
using: "composite"
steps:
- name: Create Lakebase Project
id: create-lakebase-project
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
res=$(curl \
"https://${API_HOST}/api/v2/projects" \
-w "%{http_code}" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}" \
--data "{
\"project\": {
\"org_id\": \"${ORG_ID}\",
\"name\": \"Created by actions/lakebase-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"pg_version\": ${POSTGRES_VERSION},
\"region_id\": \"${REGION_ID}\",
\"provisioner\": \"k8s-neonvm\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": ${PROJECT_SETTINGS}
}
}")
code=${res: -3}
if [[ ${code} -ge 400 ]]; then
echo Request failed with error code ${code}
echo ${res::-3}
exit 1
else
project=${res::-3}
fi
# Mask password
echo "::add-mask::$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .password')"
original_dsn=$(echo $project | jq --raw-output '.connection_uris[0].connection_uri')
echo "::add-mask::${original_dsn}"
# Extract endpoint ID from the original hostname
endpoint_id=$(echo "$original_dsn" | sed -n 's/.*@\(ep-[^.]*\)\..*/\1/p')
# Parse original URI components
user_pass=$(echo "$original_dsn" | sed -n 's/postgresql:\/\/\([^@]*\)@.*/\1/p')
database=$(echo "$original_dsn" | sed -n 's/.*\/\([^?]*\).*/\1/p')
# Construct the corrected DSN with fixed hostname and endpoint in options
if [[ "$original_dsn" == *"?"* ]]; then
# Extract existing query parameters
existing_params=$(echo "$original_dsn" | sed -n 's/.*?\(.*\)/\1/p')
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?${existing_params}&options=endpoint%3d${endpoint_id}"
else
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?options=endpoint%3d${endpoint_id}"
fi
echo "::add-mask::${dsn}"
echo "dsn=${dsn}" >> $GITHUB_OUTPUT
project_id=$(echo $project | jq --raw-output '.project.id')
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
echo "Project ${project_id} has been created"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
REGION_ID: ${{ inputs.region_id }}
POSTGRES_VERSION: ${{ inputs.postgres_version }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
PSQL: ${{ inputs.psql_path }}
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}
PROJECT_SETTINGS: ${{ inputs.project_settings }}
FIXED_HOSTNAME: ${{ inputs.fixed_hostname }}

View File

@@ -0,0 +1,39 @@
name: 'Delete Neon Project'
description: 'Delete Neon Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/ajax-api/2.0/lakebase-console'
required: true
project_id:
description: 'ID of the Project to delete'
required: true
runs:
using: "composite"
steps:
- name: Delete Lakebase Project
# Do not try to delete a project if .github/actions/neon-project-create failed before
if: ${{ inputs.project_id != '' }}
shell: bash -euxo pipefail {0}
run: |
curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}" \
--fail \
--request DELETE \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}"
echo "Project ${PROJECT_ID} has been deleted"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
PROJECT_ID: ${{ inputs.project_id }}

View File

@@ -0,0 +1,166 @@
name: Lakebase Benchmarking
on:
# uncomment to run on push for debugging your PR
push:
branches: [ bodobolero/lakebase_perf_tests ]
workflow_dispatch: # adds ability to run this manually
inputs:
postgres_version:
description: 'Postgres version'
required: false
default: '17'
save_perf_report:
type: boolean
description: 'Publish perf report'
required: false
default: false
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
lakebase-pgbench:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: ${{ github.event.inputs.postgres_version || '17' }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || false }}
PLATFORM: "lakebase-captest-new"
# TODO: for lakehouse test-shard which is probably deployed in US-West we need to change the runner
# to us-west to get correct OLTP latencies due to added speed of light latency
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
# Increase timeout to 8h, default timeout is 6h
timeout-minutes: 480
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
## TODO, currently we cannot really specify a small min max CU for lakebase project
## and the semantic of a CU is different from Neon, so we need to carefully map
## compute sizes before comparing results
- name: Create Lakebase Project
id: create-lakebase-project
uses: ./.github/actions/lakebase-project-create
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
postgres_version: ${{ env.PG_VERSION }}
compute_units: '[1, 1]'
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark simple-update
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark select-only
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Delete Lakebase Project
if: ${{ steps.create-lakebase-project.outputs.project_id && always() }}
uses: ./.github/actions/lakebase-project-delete
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
project_id: ${{ steps.create-lakebase-project.outputs.project_id }}
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Lakebase perf testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -48,20 +48,8 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
generate-ch-tmppw:
runs-on: ubuntu-22.04
outputs:
tmp_val: ${{ steps.pwgen.outputs.tmp_val }}
steps:
- name: Generate a random password
id: pwgen
run: |
set +x
p=$(dd if=/dev/random bs=14 count=1 2>/dev/null | base64)
echo tmp_val="${p//\//}" >> "${GITHUB_OUTPUT}"
test-logical-replication:
needs: [ build-build-tools-image, generate-ch-tmppw ]
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04
container:
@@ -72,20 +60,16 @@ jobs:
options: --init --user root
services:
clickhouse:
image: clickhouse/clickhouse-server:24.8
env:
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
image: clickhouse/clickhouse-server:24.6.3.64
ports:
- 9000:9000
- 8123:8123
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
image: quay.io/debezium/zookeeper:2.7
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
image: quay.io/debezium/kafka:2.7
env:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
@@ -95,7 +79,7 @@ jobs:
ports:
- 9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
image: quay.io/debezium/connect:2.7
env:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
@@ -141,7 +125,6 @@ jobs:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
- name: Delete Neon Project
if: always()

View File

@@ -558,11 +558,11 @@ async fn add_request_id_header_to_response(
mut res: Response<Body>,
req_info: RequestInfo,
) -> Result<Response<Body>, ApiError> {
if let Some(request_id) = req_info.context::<RequestId>()
&& let Ok(request_header_value) = HeaderValue::from_str(&request_id.0)
{
res.headers_mut()
.insert(&X_REQUEST_ID_HEADER, request_header_value);
if let Some(request_id) = req_info.context::<RequestId>() {
if let Ok(request_header_value) = HeaderValue::from_str(&request_id.0) {
res.headers_mut()
.insert(&X_REQUEST_ID_HEADER, request_header_value);
};
};
Ok(res)

View File

@@ -72,10 +72,10 @@ impl Server {
if err.is_incomplete_message() || err.is_closed() || err.is_timeout() {
return true;
}
if let Some(inner) = err.source()
&& let Some(io) = inner.downcast_ref::<std::io::Error>()
{
return suppress_io_error(io);
if let Some(inner) = err.source() {
if let Some(io) = inner.downcast_ref::<std::io::Error>() {
return suppress_io_error(io);
}
}
false
}

View File

@@ -363,7 +363,7 @@ where
// TODO: An Iterator might be nicer. The communicator's clock algorithm needs to
// _slowly_ iterate through all buckets with its clock hand, without holding a lock.
// If we switch to an Iterator, it must not hold the lock.
pub fn get_at_bucket(&self, pos: usize) -> Option<ValueReadGuard<'_, (K, V)>> {
pub fn get_at_bucket(&self, pos: usize) -> Option<ValueReadGuard<(K, V)>> {
let map = unsafe { self.shared_ptr.as_ref() }.unwrap().read();
if pos >= map.buckets.len() {
return None;

View File

@@ -49,7 +49,7 @@ impl PerfSpan {
}
}
pub fn enter(&self) -> PerfSpanEntered<'_> {
pub fn enter(&self) -> PerfSpanEntered {
if let Some(ref id) = self.inner.id() {
self.dispatch.enter(id);
}

View File

@@ -52,7 +52,7 @@ pub(crate) fn regenerate(
};
// Express a static value for how many shards we may schedule on one node
const MAX_SHARDS: u32 = 2500;
const MAX_SHARDS: u32 = 5000;
let mut doc = PageserverUtilization {
disk_usage_bytes: used,

View File

@@ -79,6 +79,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#define NEON_PANIC_CONNECTION_STATE(shard_no, elvl, message, ...) \
neon_shard_log(shard_no, elvl, "Broken connection state: " message, \
##__VA_ARGS__)

View File

@@ -635,11 +635,6 @@ lfc_init(void)
NULL);
}
/*
* Dump a list of pages that are currently in the LFC
*
* This is used to get a snapshot that can be used to prewarm the LFC later.
*/
FileCacheState*
lfc_get_state(size_t max_entries)
{
@@ -2272,3 +2267,4 @@ get_prewarm_info(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}

View File

@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* neon.c
* Main entry point into the neon extension
* Main entry point into the neon exension
*
*-------------------------------------------------------------------------
*/
@@ -508,7 +508,7 @@ _PG_init(void)
DefineCustomBoolVariable(
"neon.disable_logical_replication_subscribers",
"Disable incoming logical replication",
"Disables incomming logical replication",
NULL,
&disable_logical_replication_subscribers,
false,
@@ -567,7 +567,7 @@ _PG_init(void)
DefineCustomEnumVariable(
"neon.debug_compare_local",
"Debug mode for comparing content of pages in prefetch ring/LFC/PS and local disk",
"Debug mode for compaing content of pages in prefetch ring/LFC/PS and local disk",
NULL,
&debug_compare_local,
DEBUG_COMPARE_LOCAL_NONE,
@@ -735,6 +735,7 @@ neon_shmem_request_hook(void)
static void
neon_shmem_startup_hook(void)
{
/* Initialize */
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();

View File

@@ -167,7 +167,11 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
*/
#define NUM_NEON_PERF_COUNTER_SLOTS (MaxBackends + NUM_AUXILIARY_PROCS)
#if PG_VERSION_NUM >= 170000
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProcNumber])
#else
#define MyNeonCounters (&neon_per_backend_counters_shared[MyProc->pgprocno])
#endif
extern void inc_getpage_wait(uint64 latency);
extern void inc_page_cache_read_wait(uint64 latency);

View File

@@ -9,10 +9,6 @@
#include "fmgr.h"
#include "storage/buf_internals.h"
#if PG_MAJORVERSION_NUM < 16
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#if PG_MAJORVERSION_NUM < 17
#define NRelFileInfoBackendIsTemp(rinfo) (rinfo.backend != InvalidBackendId)
#else
@@ -162,10 +158,6 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define AmAutoVacuumWorkerProcess() (IsAutoVacuumWorkerProcess())
#endif
#if PG_MAJORVERSION_NUM < 17
#define MyProcNumber (MyProc - &ProcGlobal->allProcs[0])
#endif
#if PG_MAJORVERSION_NUM < 15
extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
extern TimeLineID GetWALInsertionTimeLine(void);

View File

@@ -72,6 +72,10 @@
#include "access/xlogrecovery.h"
#endif
#if PG_VERSION_NUM < 160000
typedef PGAlignedBlock PGIOAlignedBlock;
#endif
#include "access/nbtree.h"
#include "storage/bufpage.h"
#include "access/xlog_internal.h"

View File

@@ -13,7 +13,6 @@
#include "neon.h"
#include "neon_pgversioncompat.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include RELFILEINFO_HDR
#include "storage/smgr.h"
@@ -24,6 +23,10 @@
#include "utils/dynahash.h"
#include "utils/guc.h"
#if PG_VERSION_NUM >= 150000
#include "miscadmin.h"
#endif
typedef struct
{
NRelFileInfo rinfo;

View File

@@ -535,7 +535,12 @@ pub async fn run() -> anyhow::Result<()> {
// add a task to flush the db_schema cache every 10 minutes
#[cfg(feature = "rest_broker")]
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
maintenance_tasks.spawn(db_schema_cache.maintain());
maintenance_tasks.spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(600)).await;
db_schema_cache.0.run_pending_tasks();
}
});
}
if let Some(metrics_config) = &config.metric_collection {

View File

@@ -2,12 +2,8 @@ use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant};
use moka::Expiry;
use moka::notification::RemovalCause;
use crate::control_plane::messages::ControlPlaneErrorMessage;
use crate::metrics::{
CacheEviction, CacheKind, CacheOutcome, CacheOutcomeGroup, CacheRemovalCause, Metrics,
};
/// Default TTL used when caching errors from control plane.
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
@@ -134,35 +130,3 @@ impl<K, V> Expiry<K, ControlPlaneResult<V>> for CplaneExpiry {
self.expire_early(value, updated_at)
}
}
pub fn eviction_listener(kind: CacheKind, cause: RemovalCause) {
let cause = match cause {
RemovalCause::Expired => CacheRemovalCause::Expired,
RemovalCause::Explicit => CacheRemovalCause::Explicit,
RemovalCause::Replaced => CacheRemovalCause::Replaced,
RemovalCause::Size => CacheRemovalCause::Size,
};
Metrics::get()
.cache
.evicted_total
.inc(CacheEviction { cache: kind, cause });
}
#[inline]
pub fn count_cache_outcome<T>(kind: CacheKind, cache_result: Option<T>) -> Option<T> {
let outcome = if cache_result.is_some() {
CacheOutcome::Hit
} else {
CacheOutcome::Miss
};
Metrics::get().cache.request_total.inc(CacheOutcomeGroup {
cache: kind,
outcome,
});
cache_result
}
#[inline]
pub fn count_cache_insert(kind: CacheKind) {
Metrics::get().cache.inserted_total.inc(kind);
}

View File

@@ -1,8 +1,7 @@
use crate::cache::common::{Cache, count_cache_insert, count_cache_outcome, eviction_listener};
use crate::cache::common::Cache;
use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry};
use crate::config::CacheOptions;
use crate::control_plane::NodeInfo;
use crate::metrics::{CacheKind, Metrics};
use crate::types::EndpointCacheKey;
pub(crate) struct NodeInfoCache(moka::sync::Cache<EndpointCacheKey, ControlPlaneResult<NodeInfo>>);
@@ -20,30 +19,18 @@ impl Cache for NodeInfoCache {
impl NodeInfoCache {
pub fn new(config: CacheOptions) -> Self {
let builder = moka::sync::Cache::builder()
.name("node_info")
.name("node_info_cache")
.expire_after(CplaneExpiry::default());
let builder = config.moka(builder);
if let Some(size) = config.size {
Metrics::get()
.cache
.capacity
.set(CacheKind::NodeInfo, size as i64);
}
let builder = builder
.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::NodeInfo, cause));
Self(builder.build())
}
pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult<NodeInfo>) {
count_cache_insert(CacheKind::NodeInfo);
self.0.insert(key, value);
}
pub fn get(&self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
count_cache_outcome(CacheKind::NodeInfo, self.0.get(key))
pub fn get(&'static self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
self.0.get(key)
}
pub fn get_entry(

View File

@@ -5,14 +5,11 @@ use clashmap::ClashMap;
use moka::sync::Cache;
use tracing::{debug, info};
use crate::cache::common::{
ControlPlaneResult, CplaneExpiry, count_cache_insert, count_cache_outcome, eviction_listener,
};
use crate::cache::common::{ControlPlaneResult, CplaneExpiry};
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::metrics::{CacheKind, Metrics};
use crate::types::{EndpointId, RoleName};
/// Cache for project info.
@@ -85,32 +82,17 @@ impl ProjectInfoCache {
impl ProjectInfoCache {
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
Metrics::get().cache.capacity.set(
CacheKind::ProjectInfoRoles,
(config.size * config.max_roles) as i64,
);
Metrics::get()
.cache
.capacity
.set(CacheKind::ProjectInfoEndpoints, config.size as i64);
// we cache errors for 30 seconds, unless retry_at is set.
let expiry = CplaneExpiry::default();
Self {
role_controls: Cache::builder()
.name("project_info_roles")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoRoles, cause);
})
.name("role_access_controls")
.max_capacity(config.size * config.max_roles)
.time_to_live(config.ttl)
.expire_after(expiry)
.build(),
ep_controls: Cache::builder()
.name("project_info_endpoints")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoEndpoints, cause);
})
.name("endpoint_access_controls")
.max_capacity(config.size)
.time_to_live(config.ttl)
.expire_after(expiry)
@@ -129,10 +111,7 @@ impl ProjectInfoCache {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
let role_name = RoleNameInt::get(role_name)?;
count_cache_outcome(
CacheKind::ProjectInfoRoles,
self.role_controls.get(&(endpoint_id, role_name)),
)
self.role_controls.get(&(endpoint_id, role_name))
}
pub(crate) fn get_endpoint_access(
@@ -141,10 +120,7 @@ impl ProjectInfoCache {
) -> Option<ControlPlaneResult<EndpointAccessControl>> {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
count_cache_outcome(
CacheKind::ProjectInfoEndpoints,
self.ep_controls.get(&endpoint_id),
)
self.ep_controls.get(&endpoint_id)
}
pub(crate) fn insert_endpoint_access(
@@ -168,9 +144,6 @@ impl ProjectInfoCache {
"created a cache entry for endpoint access"
);
count_cache_insert(CacheKind::ProjectInfoEndpoints);
count_cache_insert(CacheKind::ProjectInfoRoles);
self.ep_controls.insert(endpoint_id, Ok(controls));
self.role_controls
.insert((endpoint_id, role_name), Ok(role_controls));
@@ -199,14 +172,10 @@ impl ProjectInfoCache {
// leave the entry alone if it's already Ok
Some(entry) if entry.value().is_ok() => moka::ops::compute::Op::Nop,
// replace the entry
_ => {
count_cache_insert(CacheKind::ProjectInfoEndpoints);
moka::ops::compute::Op::Put(Err(msg.clone()))
}
_ => moka::ops::compute::Op::Put(Err(msg.clone())),
});
}
count_cache_insert(CacheKind::ProjectInfoRoles);
self.role_controls
.insert((endpoint_id, role_name), Err(msg));
}

View File

@@ -8,8 +8,8 @@ use measured::label::{
use measured::metric::histogram::Thresholds;
use measured::metric::name::MetricName;
use measured::{
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
LabelGroup, MetricGroup,
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
MetricGroup,
};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
use tokio::time::{self, Instant};
@@ -29,9 +29,6 @@ pub struct Metrics {
#[metric(namespace = "service")]
pub service: ServiceMetrics,
#[metric(namespace = "cache")]
pub cache: CacheMetrics,
}
static SELF: OnceLock<Metrics> = OnceLock::new();
@@ -222,6 +219,13 @@ pub enum Bool {
False,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "outcome")]
pub enum CacheOutcome {
Hit,
Miss,
}
#[derive(LabelGroup)]
#[label(set = ConsoleRequestSet)]
pub struct ConsoleRequest<'a> {
@@ -553,6 +557,14 @@ impl From<bool> for Bool {
}
}
#[derive(LabelGroup)]
#[label(set = InvalidEndpointsSet)]
pub struct InvalidEndpointsGroup {
pub protocol: Protocol,
pub rejected: Bool,
pub outcome: ConnectOutcome,
}
#[derive(LabelGroup)]
#[label(set = RetriesMetricSet)]
pub struct RetriesMetricGroup {
@@ -692,59 +704,3 @@ pub enum ServiceState {
Running,
Terminating,
}
#[derive(MetricGroup)]
#[metric(new())]
pub struct CacheMetrics {
/// The capacity of the cache
pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
/// The total number of entries inserted into the cache
pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
/// The total number of entries removed from the cache
pub evicted_total: CounterVec<CacheEvictionSet>,
/// The total number of cache requests
pub request_total: CounterVec<CacheOutcomeSet>,
}
impl Default for CacheMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
#[label(singleton = "cache")]
pub enum CacheKind {
NodeInfo,
ProjectInfoEndpoints,
ProjectInfoRoles,
Schema,
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
pub enum CacheRemovalCause {
Expired,
Explicit,
Replaced,
Size,
}
#[derive(LabelGroup)]
#[label(set = CacheEvictionSet)]
pub struct CacheEviction {
pub cache: CacheKind,
pub cause: CacheRemovalCause,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
pub enum CacheOutcome {
Hit,
Miss,
}
#[derive(LabelGroup)]
#[label(set = CacheOutcomeSet)]
pub struct CacheOutcomeGroup {
pub cache: CacheKind,
pub outcome: CacheOutcome,
}

View File

@@ -1,6 +1,5 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use bytes::Bytes;
@@ -55,12 +54,11 @@ use super::http_util::{
};
use super::json::JsonConversionError;
use crate::auth::backend::ComputeCredentialKeys;
use crate::cache::common::{count_cache_insert, count_cache_outcome, eviction_listener};
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::read_body_with_limit;
use crate::metrics::{CacheKind, Metrics};
use crate::metrics::Metrics;
use crate::serverless::sql_over_http::HEADER_VALUE_TRUE;
use crate::types::EndpointCacheKey;
use crate::util::deserialize_json_string;
@@ -140,31 +138,15 @@ pub struct ApiConfig {
}
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
pub(crate) struct DbSchemaCache(Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
pub(crate) struct DbSchemaCache(pub Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
impl DbSchemaCache {
pub fn new(config: crate::config::CacheOptions) -> Self {
let builder = Cache::builder().name("schema");
let builder = Cache::builder().name("db_schema_cache");
let builder = config.moka(builder);
let metrics = &Metrics::get().cache;
if let Some(size) = config.size {
metrics.capacity.set(CacheKind::Schema, size as i64);
}
let builder =
builder.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::Schema, cause));
Self(builder.build())
}
pub async fn maintain(&self) -> Result<Infallible, anyhow::Error> {
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
ticker.tick().await;
self.0.run_pending_tasks();
}
}
pub async fn get_cached_or_remote(
&self,
endpoint_id: &EndpointCacheKey,
@@ -174,8 +156,7 @@ impl DbSchemaCache {
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id));
match cache_result {
match self.0.get(endpoint_id) {
Some(v) => Ok(v),
None => {
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
@@ -199,7 +180,6 @@ impl DbSchemaCache {
db_extra_search_path: None,
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value);
return Err(e);
}
@@ -208,7 +188,6 @@ impl DbSchemaCache {
}
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
}

View File

@@ -102,7 +102,7 @@ pub struct ReportedError {
}
impl ReportedError {
pub fn new(e: impl UserFacingError + Into<anyhow::Error>) -> Self {
pub fn new(e: (impl UserFacingError + Into<anyhow::Error>)) -> Self {
let error_kind = e.get_error_kind();
Self {
source: e.into(),

View File

@@ -9,10 +9,9 @@
```bash
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
export CLICKHOUSE_PASSWORD=ch_password123
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k 'test_clickhouse[release-pg17]'
./scripts/pytest -m remote_cluster -k test_clickhouse
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
```
@@ -22,6 +21,6 @@ docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k 'test_debezium[release-pg17]'
./scripts/pytest -m remote_cluster -k test_debezium
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml down
```

View File

@@ -1,11 +1,9 @@
services:
clickhouse:
image: clickhouse/clickhouse-server:25.6
image: clickhouse/clickhouse-server
user: "101:101"
container_name: clickhouse
hostname: clickhouse
environment:
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-ch_password123}
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000

View File

@@ -1,28 +1,18 @@
services:
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
ports:
- 127.0.0.1:2181:2181
- 127.0.0.1:2888:2888
- 127.0.0.1:3888:3888
image: quay.io/debezium/zookeeper:2.7
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
depends_on: [zookeeper]
image: quay.io/debezium/kafka:2.7
environment:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
- 29092:29092
- 127.0.0.1:9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
depends_on: [kafka]
image: quay.io/debezium/connect:2.7
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1

View File

@@ -53,13 +53,8 @@ def test_clickhouse(remote_pg: RemotePostgres):
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
if "CLICKHOUSE_PASSWORD" not in os.environ:
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
client = clickhouse_connect.get_client(
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
)
client = clickhouse_connect.get_client(host=clickhouse_host)
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command("DROP DATABASE IF EXISTS db1_postgres")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "
f"MaterializedPostgreSQL('{conn_options['host']}', "

View File

@@ -17,7 +17,6 @@ from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
from kafka import KafkaConsumer
class DebeziumAPI:
@@ -102,13 +101,9 @@ def debezium(remote_pg: RemotePostgres):
assert len(dbz.list_connectors()) == 1
from kafka import KafkaConsumer
kafka_host = "kafka" if (os.getenv("CI", "false") == "true") else "127.0.0.1"
kafka_port = 9092 if (os.getenv("CI", "false") == "true") else 29092
log.info("Connecting to Kafka: %s:%s", kafka_host, kafka_port)
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=[f"{kafka_host}:{kafka_port}"],
bootstrap_servers=["kafka:9092"],
auto_offset_reset="earliest",
enable_auto_commit=False,
)
@@ -117,7 +112,7 @@ def debezium(remote_pg: RemotePostgres):
assert resp.status_code == 204
def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> None:
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
"""
Gets the message from Kafka and checks its validity
Arguments:
@@ -129,7 +124,6 @@ def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> No
after: a dictionary, if not None, the after field from the kafka message must
have the same values for the same keys
"""
log.info("Bootstrap servers: %s", consumer.config["bootstrap_servers"])
msg = consumer.poll()
assert msg, "Empty message"
for val in msg.values():

View File

@@ -298,26 +298,15 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
@pytest.mark.parametrize("compaction", ["compaction_enabled", "compaction_disabled"])
def test_pageserver_metrics_removed_after_offload(
neon_env_builder: NeonEnvBuilder, compaction: str
):
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_1, _ = env.create_tenant(
conf={
# disable background compaction and GC so that we don't have leftover tasks
# after offloading.
"gc_period": "0s",
"compaction_period": "0s",
}
if compaction == "compaction_disabled"
else None
)
tenant_1, _ = env.create_tenant()
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
@@ -362,23 +351,6 @@ def test_pageserver_metrics_removed_after_offload(
state=TimelineArchivalState.ARCHIVED,
)
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
# We need to wait until all background jobs are finished before we can check the metrics.
# There're many of them: compaction, GC, etc.
wait_until(
lambda: all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_waiting_tasks")
)
and all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_running_tasks")
)
)
post_offload_samples = set(
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
)