Compare commits

..

4 Commits

Author SHA1 Message Date
BodoBolero
0ef628fd9e test a variant with fixed compute size 2025-07-28 17:01:14 +02:00
John Spray
60feb168e2 pageserver: decrease MAX_SHARDS in utilization (#12668)
## Problem

When tenants have a lot of timelines, the number of tenants that a
pageserver can comfortably handle goes down. Branching is much more
widely used in practice now than it was when this code was written, and
we generally run pageservers with a few thousand tenants (where each
tenant has many timelines), rather than the 10k-20k we might have done
historically.

This should really be something configurable, or a more direct proxy for
resource utilization (such as non-archived timeline count), but this
change should be a low effort improvement.

## Summary of changes

* Change the target shard count (MAX_SHARDS) to 2500 from 5000 when
calculating pageserver utilization (i.e. a 200% overcommit now
corresponds to 5000 shards, not 10000 shards)

Co-authored-by: John Spray <john.spray@databricks.com>
2025-07-28 13:50:18 +00:00
a-masterov
da596a5162 Update the versions for ClickHouse and Debezium (#12741)
## Problem
The test for logical replication used the year-old versions of
ClickHouse and Debezium so that we may miss problems related to
up-to-date versions.
## Summary of changes
The ClickHouse version has been updated to 24.8.
The Debezium version has been updated to the latest stable one,
3.1.3Final.
Some problems with locally running the Debezium test have been fixed.

---------

Co-authored-by: Alexey Masterov <alexey.masterov@databricks.com>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-07-28 13:26:33 +00:00
Conrad Ludgate
effd6bf829 [proxy] add metrics for caches (#12752)
Exposes metrics for caches. LKB-2594

This exposes a high level namespace, `cache`, that all cache metrics can
be added to - this makes it easier to make library panels for the caches
as I understand it.

To calculate the current cache fill ratio, you could use the following
query:

```
(
    cache_inserted_total{cache="node_info"}
  - sum (cache_evicted_total{cache="node_info"}) without (cause)
)
  / cache_capacity{cache="node_info"}
```

To calculate the cache hit ratio, you could use the following query:

```
  cache_request_total{cache="node_info", outcome="hit"}
/ sum (cache_request_total{cache="node_info"}) without (outcome)
```
2025-07-28 10:41:49 +00:00
18 changed files with 238 additions and 383 deletions

View File

@@ -27,9 +27,6 @@ config-variables:
- HETZNER_CACHE_BUCKET
- HETZNER_CACHE_ENDPOINT
- HETZNER_CACHE_REGION
- LAKEBASE_API_HOST
- LAKEBASE_OAUTH_CLIENT_ID
- LAKEBASE_ORG_ID
- NEON_DEV_AWS_ACCOUNT_ID
- NEON_PROD_AWS_ACCOUNT_ID
- PGREGRESS_PG16_PROJECT_ID

View File

@@ -1,126 +0,0 @@
name: 'Create Lakebase Project'
description: 'Create Lakebase Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/lakebase-console'
required: true
postgres_version:
description: 'Postgres version; default is 16'
default: '17'
compute_units:
description: '[Min, Max] compute units'
default: '[1, 1]'
psql_path:
description: 'Path to psql binary - it is caller responsibility to provision the psql binary'
required: false
default: '/tmp/neon/pg_install/v16/bin/psql'
libpq_lib_path:
description: 'Path to directory containing libpq library - it is caller responsibility to provision the libpq library'
required: false
default: '/tmp/neon/pg_install/v16/lib'
project_settings:
description: 'A JSON object with project settings'
required: false
default: '{}'
fixed_hostname:
description: 'Fixed hostname to use for connection URI'
required: false
default: 'k8s-dpingres-serverle-09ade1e9e9-0d7f675c53b35938.elb.us-west-2.amazonaws.com'
region_id:
description: 'Project region ID'
required: false
default: 'aws-us-east-2'
outputs:
dsn:
description: 'Created Project DSN (for main database)'
value: ${{ steps.create-neon-project.outputs.dsn }}
project_id:
description: 'Created Project ID'
value: ${{ steps.create-neon-project.outputs.project_id }}
runs:
using: "composite"
steps:
- name: Create Lakebase Project
id: create-lakebase-project
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
res=$(curl \
"https://${API_HOST}/api/v2/projects" \
-w "%{http_code}" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}" \
--data "{
\"project\": {
\"org_id\": \"${ORG_ID}\",
\"name\": \"Created by actions/lakebase-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"pg_version\": ${POSTGRES_VERSION},
\"region_id\": \"${REGION_ID}\",
\"provisioner\": \"k8s-neonvm\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": ${PROJECT_SETTINGS}
}
}")
code=${res: -3}
if [[ ${code} -ge 400 ]]; then
echo Request failed with error code ${code}
echo ${res::-3}
exit 1
else
project=${res::-3}
fi
# Mask password
echo "::add-mask::$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .password')"
original_dsn=$(echo $project | jq --raw-output '.connection_uris[0].connection_uri')
echo "::add-mask::${original_dsn}"
# Extract endpoint ID from the original hostname
endpoint_id=$(echo "$original_dsn" | sed -n 's/.*@\(ep-[^.]*\)\..*/\1/p')
# Parse original URI components
user_pass=$(echo "$original_dsn" | sed -n 's/postgresql:\/\/\([^@]*\)@.*/\1/p')
database=$(echo "$original_dsn" | sed -n 's/.*\/\([^?]*\).*/\1/p')
# Construct the corrected DSN with fixed hostname and endpoint in options
if [[ "$original_dsn" == *"?"* ]]; then
# Extract existing query parameters
existing_params=$(echo "$original_dsn" | sed -n 's/.*?\(.*\)/\1/p')
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?${existing_params}&options=endpoint%3d${endpoint_id}"
else
dsn="postgresql://${user_pass}@${FIXED_HOSTNAME}/${database}?options=endpoint%3d${endpoint_id}"
fi
echo "::add-mask::${dsn}"
echo "dsn=${dsn}" >> $GITHUB_OUTPUT
project_id=$(echo $project | jq --raw-output '.project.id')
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
echo "Project ${project_id} has been created"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
REGION_ID: ${{ inputs.region_id }}
POSTGRES_VERSION: ${{ inputs.postgres_version }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
PSQL: ${{ inputs.psql_path }}
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}
PROJECT_SETTINGS: ${{ inputs.project_settings }}
FIXED_HOSTNAME: ${{ inputs.fixed_hostname }}

View File

@@ -1,39 +0,0 @@
name: 'Delete Neon Project'
description: 'Delete Neon Project using API'
inputs:
access_token:
description: 'Lakebase API access token'
required: true
org_id:
description: 'Organization ID, required'
required: true
api_host:
description: 'Lakebase API host, e.g. dbc-55e65913-66de.dev.databricks.com/ajax-api/2.0/lakebase-console'
required: true
project_id:
description: 'ID of the Project to delete'
required: true
runs:
using: "composite"
steps:
- name: Delete Lakebase Project
# Do not try to delete a project if .github/actions/neon-project-create failed before
if: ${{ inputs.project_id != '' }}
shell: bash -euxo pipefail {0}
run: |
curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}" \
--fail \
--request DELETE \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${ACCESS_TOKEN}"
echo "Project ${PROJECT_ID} has been deleted"
env:
API_HOST: ${{ inputs.api_host }}
ACCESS_TOKEN: ${{ inputs.access_token }}
ORG_ID: ${{ inputs.org_id }}
PROJECT_ID: ${{ inputs.project_id }}

View File

@@ -31,15 +31,15 @@ jobs:
include:
- warehouses: 50 # defines number of warehouses and is used to compute number of terminals
max_rate: 800 # measured max TPS at scale factor based on experiments. Adjust if performance is better/worse
min_cu: 0.25 # simulate free tier plan (0.25 -2 CU)
min_cu: 2 # simulate free tier plan (0.25 -2 CU)
max_cu: 2
- warehouses: 500 # serverless plan (2-8 CU)
max_rate: 2000
min_cu: 2
min_cu: 8
max_cu: 8
- warehouses: 1000 # business plan (2-16 CU)
max_rate: 2900
min_cu: 2
min_cu: 16
max_cu: 16
max-parallel: 1 # we want to run each workload size sequentially to avoid noisy neighbors
permissions:

View File

@@ -1,166 +0,0 @@
name: Lakebase Benchmarking
on:
# uncomment to run on push for debugging your PR
push:
branches: [ bodobolero/lakebase_perf_tests ]
workflow_dispatch: # adds ability to run this manually
inputs:
postgres_version:
description: 'Postgres version'
required: false
default: '17'
save_perf_report:
type: boolean
description: 'Publish perf report'
required: false
default: false
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
jobs:
lakebase-pgbench:
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
PG_VERSION: ${{ github.event.inputs.postgres_version || '17' }}
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || false }}
PLATFORM: "lakebase-captest-new"
# TODO: for lakehouse test-shard which is probably deployed in US-West we need to change the runner
# to us-west to get correct OLTP latencies due to added speed of light latency
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
# Increase timeout to 8h, default timeout is 6h
timeout-minutes: 480
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
## TODO, currently we cannot really specify a small min max CU for lakebase project
## and the semantic of a CU is different from Neon, so we need to carefully map
## compute sizes before comparing results
- name: Create Lakebase Project
id: create-lakebase-project
uses: ./.github/actions/lakebase-project-create
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
postgres_version: ${{ env.PG_VERSION }}
compute_units: '[1, 1]'
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_init
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark simple-update
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark select-only
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
pg_version: ${{ env.PG_VERSION }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-lakebase-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Delete Lakebase Project
if: ${{ steps.create-lakebase-project.outputs.project_id && always() }}
uses: ./.github/actions/lakebase-project-delete
with:
api_host: ${{ vars.LAKEBASE_API_HOST }}
org_id: ${{ vars.LAKEBASE_ORG_ID }}
project_id: ${{ steps.create-lakebase-project.outputs.project_id }}
access_token: ${{ secrets.LAKEBASE_ACCESS_TOKEN }}
- name: Create Allure report
id: create-allure-report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
with:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Post to a Slack channel
if: ${{ failure() }}
uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Lakebase perf testing: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
<${{ steps.create-allure-report.outputs.report-url }}|Allure report>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -48,8 +48,20 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
generate-ch-tmppw:
runs-on: ubuntu-22.04
outputs:
tmp_val: ${{ steps.pwgen.outputs.tmp_val }}
steps:
- name: Generate a random password
id: pwgen
run: |
set +x
p=$(dd if=/dev/random bs=14 count=1 2>/dev/null | base64)
echo tmp_val="${p//\//}" >> "${GITHUB_OUTPUT}"
test-logical-replication:
needs: [ build-build-tools-image ]
needs: [ build-build-tools-image, generate-ch-tmppw ]
runs-on: ubuntu-22.04
container:
@@ -60,16 +72,20 @@ jobs:
options: --init --user root
services:
clickhouse:
image: clickhouse/clickhouse-server:24.6.3.64
image: clickhouse/clickhouse-server:24.8
env:
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
ports:
- 9000:9000
- 8123:8123
zookeeper:
image: quay.io/debezium/zookeeper:2.7
image: quay.io/debezium/zookeeper:3.1.3.Final
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.7
image: quay.io/debezium/kafka:3.1.3.Final
env:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
@@ -79,7 +95,7 @@ jobs:
ports:
- 9092:9092
debezium:
image: quay.io/debezium/connect:2.7
image: quay.io/debezium/connect:3.1.3.Final
env:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
@@ -125,6 +141,7 @@ jobs:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
- name: Delete Neon Project
if: always()

View File

@@ -52,7 +52,7 @@ pub(crate) fn regenerate(
};
// Express a static value for how many shards we may schedule on one node
const MAX_SHARDS: u32 = 5000;
const MAX_SHARDS: u32 = 2500;
let mut doc = PageserverUtilization {
disk_usage_bytes: used,

View File

@@ -535,12 +535,7 @@ pub async fn run() -> anyhow::Result<()> {
// add a task to flush the db_schema cache every 10 minutes
#[cfg(feature = "rest_broker")]
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
maintenance_tasks.spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(600)).await;
db_schema_cache.0.run_pending_tasks();
}
});
maintenance_tasks.spawn(db_schema_cache.maintain());
}
if let Some(metrics_config) = &config.metric_collection {

View File

@@ -2,8 +2,12 @@ use std::ops::{Deref, DerefMut};
use std::time::{Duration, Instant};
use moka::Expiry;
use moka::notification::RemovalCause;
use crate::control_plane::messages::ControlPlaneErrorMessage;
use crate::metrics::{
CacheEviction, CacheKind, CacheOutcome, CacheOutcomeGroup, CacheRemovalCause, Metrics,
};
/// Default TTL used when caching errors from control plane.
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
@@ -130,3 +134,35 @@ impl<K, V> Expiry<K, ControlPlaneResult<V>> for CplaneExpiry {
self.expire_early(value, updated_at)
}
}
pub fn eviction_listener(kind: CacheKind, cause: RemovalCause) {
let cause = match cause {
RemovalCause::Expired => CacheRemovalCause::Expired,
RemovalCause::Explicit => CacheRemovalCause::Explicit,
RemovalCause::Replaced => CacheRemovalCause::Replaced,
RemovalCause::Size => CacheRemovalCause::Size,
};
Metrics::get()
.cache
.evicted_total
.inc(CacheEviction { cache: kind, cause });
}
#[inline]
pub fn count_cache_outcome<T>(kind: CacheKind, cache_result: Option<T>) -> Option<T> {
let outcome = if cache_result.is_some() {
CacheOutcome::Hit
} else {
CacheOutcome::Miss
};
Metrics::get().cache.request_total.inc(CacheOutcomeGroup {
cache: kind,
outcome,
});
cache_result
}
#[inline]
pub fn count_cache_insert(kind: CacheKind) {
Metrics::get().cache.inserted_total.inc(kind);
}

View File

@@ -1,7 +1,8 @@
use crate::cache::common::Cache;
use crate::cache::common::{Cache, count_cache_insert, count_cache_outcome, eviction_listener};
use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry};
use crate::config::CacheOptions;
use crate::control_plane::NodeInfo;
use crate::metrics::{CacheKind, Metrics};
use crate::types::EndpointCacheKey;
pub(crate) struct NodeInfoCache(moka::sync::Cache<EndpointCacheKey, ControlPlaneResult<NodeInfo>>);
@@ -19,18 +20,30 @@ impl Cache for NodeInfoCache {
impl NodeInfoCache {
pub fn new(config: CacheOptions) -> Self {
let builder = moka::sync::Cache::builder()
.name("node_info_cache")
.name("node_info")
.expire_after(CplaneExpiry::default());
let builder = config.moka(builder);
if let Some(size) = config.size {
Metrics::get()
.cache
.capacity
.set(CacheKind::NodeInfo, size as i64);
}
let builder = builder
.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::NodeInfo, cause));
Self(builder.build())
}
pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult<NodeInfo>) {
count_cache_insert(CacheKind::NodeInfo);
self.0.insert(key, value);
}
pub fn get(&'static self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
self.0.get(key)
pub fn get(&self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
count_cache_outcome(CacheKind::NodeInfo, self.0.get(key))
}
pub fn get_entry(

View File

@@ -5,11 +5,14 @@ use clashmap::ClashMap;
use moka::sync::Cache;
use tracing::{debug, info};
use crate::cache::common::{ControlPlaneResult, CplaneExpiry};
use crate::cache::common::{
ControlPlaneResult, CplaneExpiry, count_cache_insert, count_cache_outcome, eviction_listener,
};
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::metrics::{CacheKind, Metrics};
use crate::types::{EndpointId, RoleName};
/// Cache for project info.
@@ -82,17 +85,32 @@ impl ProjectInfoCache {
impl ProjectInfoCache {
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
Metrics::get().cache.capacity.set(
CacheKind::ProjectInfoRoles,
(config.size * config.max_roles) as i64,
);
Metrics::get()
.cache
.capacity
.set(CacheKind::ProjectInfoEndpoints, config.size as i64);
// we cache errors for 30 seconds, unless retry_at is set.
let expiry = CplaneExpiry::default();
Self {
role_controls: Cache::builder()
.name("role_access_controls")
.name("project_info_roles")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoRoles, cause);
})
.max_capacity(config.size * config.max_roles)
.time_to_live(config.ttl)
.expire_after(expiry)
.build(),
ep_controls: Cache::builder()
.name("endpoint_access_controls")
.name("project_info_endpoints")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoEndpoints, cause);
})
.max_capacity(config.size)
.time_to_live(config.ttl)
.expire_after(expiry)
@@ -111,7 +129,10 @@ impl ProjectInfoCache {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
let role_name = RoleNameInt::get(role_name)?;
self.role_controls.get(&(endpoint_id, role_name))
count_cache_outcome(
CacheKind::ProjectInfoRoles,
self.role_controls.get(&(endpoint_id, role_name)),
)
}
pub(crate) fn get_endpoint_access(
@@ -120,7 +141,10 @@ impl ProjectInfoCache {
) -> Option<ControlPlaneResult<EndpointAccessControl>> {
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
self.ep_controls.get(&endpoint_id)
count_cache_outcome(
CacheKind::ProjectInfoEndpoints,
self.ep_controls.get(&endpoint_id),
)
}
pub(crate) fn insert_endpoint_access(
@@ -144,6 +168,9 @@ impl ProjectInfoCache {
"created a cache entry for endpoint access"
);
count_cache_insert(CacheKind::ProjectInfoEndpoints);
count_cache_insert(CacheKind::ProjectInfoRoles);
self.ep_controls.insert(endpoint_id, Ok(controls));
self.role_controls
.insert((endpoint_id, role_name), Ok(role_controls));
@@ -172,10 +199,14 @@ impl ProjectInfoCache {
// leave the entry alone if it's already Ok
Some(entry) if entry.value().is_ok() => moka::ops::compute::Op::Nop,
// replace the entry
_ => moka::ops::compute::Op::Put(Err(msg.clone())),
_ => {
count_cache_insert(CacheKind::ProjectInfoEndpoints);
moka::ops::compute::Op::Put(Err(msg.clone()))
}
});
}
count_cache_insert(CacheKind::ProjectInfoRoles);
self.role_controls
.insert((endpoint_id, role_name), Err(msg));
}

View File

@@ -8,8 +8,8 @@ use measured::label::{
use measured::metric::histogram::Thresholds;
use measured::metric::name::MetricName;
use measured::{
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
MetricGroup,
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
LabelGroup, MetricGroup,
};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
use tokio::time::{self, Instant};
@@ -29,6 +29,9 @@ pub struct Metrics {
#[metric(namespace = "service")]
pub service: ServiceMetrics,
#[metric(namespace = "cache")]
pub cache: CacheMetrics,
}
static SELF: OnceLock<Metrics> = OnceLock::new();
@@ -219,13 +222,6 @@ pub enum Bool {
False,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "outcome")]
pub enum CacheOutcome {
Hit,
Miss,
}
#[derive(LabelGroup)]
#[label(set = ConsoleRequestSet)]
pub struct ConsoleRequest<'a> {
@@ -704,3 +700,59 @@ pub enum ServiceState {
Running,
Terminating,
}
#[derive(MetricGroup)]
#[metric(new())]
pub struct CacheMetrics {
/// The capacity of the cache
pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
/// The total number of entries inserted into the cache
pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
/// The total number of entries removed from the cache
pub evicted_total: CounterVec<CacheEvictionSet>,
/// The total number of cache requests
pub request_total: CounterVec<CacheOutcomeSet>,
}
impl Default for CacheMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
#[label(singleton = "cache")]
pub enum CacheKind {
NodeInfo,
ProjectInfoEndpoints,
ProjectInfoRoles,
Schema,
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
pub enum CacheRemovalCause {
Expired,
Explicit,
Replaced,
Size,
}
#[derive(LabelGroup)]
#[label(set = CacheEvictionSet)]
pub struct CacheEviction {
pub cache: CacheKind,
pub cause: CacheRemovalCause,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
pub enum CacheOutcome {
Hit,
Miss,
}
#[derive(LabelGroup)]
#[label(set = CacheOutcomeSet)]
pub struct CacheOutcomeGroup {
pub cache: CacheKind,
pub outcome: CacheOutcome,
}

View File

@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use bytes::Bytes;
@@ -54,11 +55,12 @@ use super::http_util::{
};
use super::json::JsonConversionError;
use crate::auth::backend::ComputeCredentialKeys;
use crate::cache::common::{count_cache_insert, count_cache_outcome, eviction_listener};
use crate::config::ProxyConfig;
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::read_body_with_limit;
use crate::metrics::Metrics;
use crate::metrics::{CacheKind, Metrics};
use crate::serverless::sql_over_http::HEADER_VALUE_TRUE;
use crate::types::EndpointCacheKey;
use crate::util::deserialize_json_string;
@@ -138,15 +140,31 @@ pub struct ApiConfig {
}
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
pub(crate) struct DbSchemaCache(pub Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
pub(crate) struct DbSchemaCache(Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
impl DbSchemaCache {
pub fn new(config: crate::config::CacheOptions) -> Self {
let builder = Cache::builder().name("db_schema_cache");
let builder = Cache::builder().name("schema");
let builder = config.moka(builder);
let metrics = &Metrics::get().cache;
if let Some(size) = config.size {
metrics.capacity.set(CacheKind::Schema, size as i64);
}
let builder =
builder.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::Schema, cause));
Self(builder.build())
}
pub async fn maintain(&self) -> Result<Infallible, anyhow::Error> {
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
ticker.tick().await;
self.0.run_pending_tasks();
}
}
pub async fn get_cached_or_remote(
&self,
endpoint_id: &EndpointCacheKey,
@@ -156,7 +174,8 @@ impl DbSchemaCache {
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
match self.0.get(endpoint_id) {
let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id));
match cache_result {
Some(v) => Ok(v),
None => {
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
@@ -180,6 +199,7 @@ impl DbSchemaCache {
db_extra_search_path: None,
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value);
return Err(e);
}
@@ -188,6 +208,7 @@ impl DbSchemaCache {
}
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
}

View File

@@ -9,9 +9,10 @@
```bash
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
export CLICKHOUSE_PASSWORD=ch_password123
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k test_clickhouse
./scripts/pytest -m remote_cluster -k 'test_clickhouse[release-pg17]'
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
```
@@ -21,6 +22,6 @@ docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k test_debezium
./scripts/pytest -m remote_cluster -k 'test_debezium[release-pg17]'
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml down
```

View File

@@ -1,9 +1,11 @@
services:
clickhouse:
image: clickhouse/clickhouse-server
image: clickhouse/clickhouse-server:25.6
user: "101:101"
container_name: clickhouse
hostname: clickhouse
environment:
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-ch_password123}
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000

View File

@@ -1,18 +1,28 @@
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.7
image: quay.io/debezium/zookeeper:3.1.3.Final
ports:
- 127.0.0.1:2181:2181
- 127.0.0.1:2888:2888
- 127.0.0.1:3888:3888
kafka:
image: quay.io/debezium/kafka:2.7
image: quay.io/debezium/kafka:3.1.3.Final
depends_on: [zookeeper]
environment:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 127.0.0.1:9092:9092
- 9092:9092
- 29092:29092
debezium:
image: quay.io/debezium/connect:2.7
image: quay.io/debezium/connect:3.1.3.Final
depends_on: [kafka]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1

View File

@@ -53,8 +53,13 @@ def test_clickhouse(remote_pg: RemotePostgres):
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
client = clickhouse_connect.get_client(host=clickhouse_host)
if "CLICKHOUSE_PASSWORD" not in os.environ:
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
client = clickhouse_connect.get_client(
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
)
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command("DROP DATABASE IF EXISTS db1_postgres")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "
f"MaterializedPostgreSQL('{conn_options['host']}', "

View File

@@ -17,6 +17,7 @@ from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
from kafka import KafkaConsumer
class DebeziumAPI:
@@ -101,9 +102,13 @@ def debezium(remote_pg: RemotePostgres):
assert len(dbz.list_connectors()) == 1
from kafka import KafkaConsumer
kafka_host = "kafka" if (os.getenv("CI", "false") == "true") else "127.0.0.1"
kafka_port = 9092 if (os.getenv("CI", "false") == "true") else 29092
log.info("Connecting to Kafka: %s:%s", kafka_host, kafka_port)
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=["kafka:9092"],
bootstrap_servers=[f"{kafka_host}:{kafka_port}"],
auto_offset_reset="earliest",
enable_auto_commit=False,
)
@@ -112,7 +117,7 @@ def debezium(remote_pg: RemotePostgres):
assert resp.status_code == 204
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> None:
"""
Gets the message from Kafka and checks its validity
Arguments:
@@ -124,6 +129,7 @@ def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
after: a dictionary, if not None, the after field from the kafka message must
have the same values for the same keys
"""
log.info("Bootstrap servers: %s", consumer.config["bootstrap_servers"])
msg = consumer.poll()
assert msg, "Empty message"
for val in msg.values():