Compare commits

..

6 Commits

Author SHA1 Message Date
Conrad Ludgate
8549b42bca implement removal 2025-07-26 18:43:07 +01:00
Conrad Ludgate
b39498cf6b add basic ref counting 2025-07-26 18:41:52 +01:00
Conrad Ludgate
68ccdd910a move project info mappings into an arc 2025-07-26 18:40:37 +01:00
Conrad Ludgate
8b299bfb4e track account id and project id in the cache entry 2025-07-26 18:40:04 +01:00
Conrad Ludgate
3d2af1a83f use crossbeam-skiplist and clever range queries for a lock-free way to represent K->Set<V> 2025-07-26 18:40:00 +01:00
Conrad Ludgate
e4872bec57 add metrics for caches 2025-07-26 09:32:51 +01:00
14 changed files with 271 additions and 144 deletions

View File

@@ -31,15 +31,15 @@ jobs:
include:
- warehouses: 50 # defines number of warehouses and is used to compute number of terminals
max_rate: 800 # measured max TPS at scale factor based on experiments. Adjust if performance is better/worse
min_cu: 2 # simulate free tier plan (0.25 -2 CU)
min_cu: 0.25 # simulate free tier plan (0.25 -2 CU)
max_cu: 2
- warehouses: 500 # serverless plan (2-8 CU)
max_rate: 2000
min_cu: 8
min_cu: 2
max_cu: 8
- warehouses: 1000 # business plan (2-16 CU)
max_rate: 2900
min_cu: 16
min_cu: 2
max_cu: 16
max-parallel: 1 # we want to run each workload size sequentially to avoid noisy neighbors
permissions:

View File

@@ -48,20 +48,8 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
generate-ch-tmppw:
runs-on: ubuntu-22.04
outputs:
tmp_val: ${{ steps.pwgen.outputs.tmp_val }}
steps:
- name: Generate a random password
id: pwgen
run: |
set +x
p=$(dd if=/dev/random bs=14 count=1 2>/dev/null | base64)
echo tmp_val="${p//\//}" >> "${GITHUB_OUTPUT}"
test-logical-replication:
needs: [ build-build-tools-image, generate-ch-tmppw ]
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04
container:
@@ -72,20 +60,16 @@ jobs:
options: --init --user root
services:
clickhouse:
image: clickhouse/clickhouse-server:24.8
env:
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
image: clickhouse/clickhouse-server:24.6.3.64
ports:
- 9000:9000
- 8123:8123
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
image: quay.io/debezium/zookeeper:2.7
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
image: quay.io/debezium/kafka:2.7
env:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
@@ -95,7 +79,7 @@ jobs:
ports:
- 9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
image: quay.io/debezium/connect:2.7
env:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
@@ -141,7 +125,6 @@ jobs:
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
CLICKHOUSE_PASSWORD: ${{ needs.generate-ch-tmppw.outputs.tmp_val }}
- name: Delete Neon Project
if: always()

77
Cargo.lock generated
View File

@@ -253,6 +253,12 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "atomic-maybe-uninit"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdca051497d8e15fcc5cb1b07ff75ee2afc3789b986ca9ee2af204847c0a9223"
[[package]]
name = "atomic-take"
version = "1.1.0"
@@ -977,7 +983,7 @@ dependencies = [
"bitflags 2.8.0",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"itertools 0.13.0",
"log",
"prettyplease",
"proc-macro2",
@@ -1252,7 +1258,7 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93bd59c81e2bd87a775ae2de75f070f7e2bfe97363a6ad652f46824564c23e4d"
dependencies = [
"crossbeam-utils",
"crossbeam-utils 0.8.19",
"hashbrown 0.15.2",
"lock_api",
"parking_lot_core 0.9.8",
@@ -1408,7 +1414,7 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
"crossbeam-utils 0.8.19",
]
[[package]]
@@ -1619,7 +1625,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if",
"crossbeam-utils",
"crossbeam-utils 0.8.19",
]
[[package]]
@@ -1628,8 +1634,8 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.8.19",
]
[[package]]
@@ -1638,7 +1644,24 @@ version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
"crossbeam-utils 0.8.19",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9"
dependencies = [
"crossbeam-utils 0.8.21",
]
[[package]]
name = "crossbeam-skiplist"
version = "0.1.3"
source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9"
dependencies = [
"crossbeam-epoch 0.9.18 (git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9)",
"crossbeam-utils 0.8.21",
]
[[package]]
@@ -1647,6 +1670,14 @@ version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9"
dependencies = [
"atomic-maybe-uninit",
]
[[package]]
name = "crossterm"
version = "0.27.0"
@@ -1790,7 +1821,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"crossbeam-utils 0.8.19",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
@@ -3337,7 +3368,7 @@ dependencies = [
"ahash",
"clap",
"crossbeam-channel",
"crossbeam-utils",
"crossbeam-utils 0.8.19",
"dashmap 6.1.0",
"env_logger",
"indexmap 2.10.0",
@@ -3439,6 +3470,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.11"
@@ -3742,7 +3782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
dependencies = [
"bytes",
"crossbeam-utils",
"crossbeam-utils 0.8.19",
"hashbrown 0.14.5",
"itoa",
"lasso",
@@ -3906,8 +3946,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926"
dependencies = [
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.8.19",
"loom",
"parking_lot 0.12.1",
"portable-atomic",
@@ -5340,7 +5380,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck 0.5.0",
"itertools 0.12.1",
"itertools 0.13.0",
"log",
"multimap",
"once_cell",
@@ -5373,7 +5413,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.12.1",
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -5422,6 +5462,7 @@ dependencies = [
"clashmap",
"compute_api",
"consumption_metrics",
"crossbeam-skiplist",
"ecdsa 0.16.9",
"ed25519-dalek",
"env_logger",
@@ -5527,7 +5568,7 @@ version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
dependencies = [
"crossbeam-utils",
"crossbeam-utils 0.8.19",
"libc",
"once_cell",
"raw-cpuid",
@@ -5728,7 +5769,7 @@ checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"crossbeam-utils 0.8.19",
"num_cpus",
]
@@ -9014,8 +9055,8 @@ dependencies = [
"clap",
"clap_builder",
"const-oid",
"crossbeam-epoch",
"crossbeam-utils",
"crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.8.19",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",

View File

@@ -52,7 +52,7 @@ pub(crate) fn regenerate(
};
// Express a static value for how many shards we may schedule on one node
const MAX_SHARDS: u32 = 2500;
const MAX_SHARDS: u32 = 5000;
let mut doc = PageserverUtilization {
disk_usage_bytes: used,

View File

@@ -29,6 +29,7 @@ clap = { workspace = true, features = ["derive", "env"] }
clashmap.workspace = true
compute_api.workspace = true
consumption_metrics.workspace = true
crossbeam-skiplist = { git = "https://github.com/crossbeam-rs/crossbeam", rev = "8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9" }
env_logger.workspace = true
framed-websockets.workspace = true
futures.workspace = true

View File

@@ -20,7 +20,7 @@ impl Cache for NodeInfoCache {
impl NodeInfoCache {
pub fn new(config: CacheOptions) -> Self {
let builder = moka::sync::Cache::builder()
.name("node_info")
.name("node_info_cache")
.expire_after(CplaneExpiry::default());
let builder = config.moka(builder);

View File

@@ -1,7 +1,8 @@
use std::collections::HashSet;
use std::convert::Infallible;
use std::sync::{Arc, Mutex};
use clashmap::ClashMap;
use crossbeam_skiplist::SkipMap;
use crossbeam_skiplist::equivalent::{Comparable, Equivalent};
use moka::sync::Cache;
use tracing::{debug, info};
@@ -11,6 +12,7 @@ use crate::cache::common::{
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
use crate::ext::LockExt;
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::metrics::{CacheKind, Metrics};
use crate::types::{EndpointId, RoleName};
@@ -23,16 +25,83 @@ use crate::types::{EndpointId, RoleName};
/// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
/// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
pub struct ProjectInfoCache {
role_controls: Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult<RoleAccessControl>>,
ep_controls: Cache<EndpointIdInt, ControlPlaneResult<EndpointAccessControl>>,
role_controls:
Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult<Entry<RoleAccessControl>>>,
ep_controls: Cache<EndpointIdInt, ControlPlaneResult<Entry<EndpointAccessControl>>>,
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
// FIXME(stefan): we need a way to GC the account2ep map.
account2ep: ClashMap<AccountIdInt, HashSet<EndpointIdInt>>,
project2ep: Arc<RefCountMultiSet<ProjectIdInt, EndpointIdInt>>,
account2ep: Arc<RefCountMultiSet<AccountIdInt, EndpointIdInt>>,
config: ProjectInfoCacheOptions,
}
type RefCount = Mutex<usize>;
// This is rather hacky.
// We use an ordered map of (K, V) -> RefCount.
// We use range queries over `(K, _)..(K+1, _)` to do the invalidation.
// We use the RefCount to know when to remove entries.
type RefCountMultiSet<K, V> = SkipMap<KeyValue<K, V>, RefCount>;
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
struct KeyValue<K, V>(K, V);
struct Key<'a, K>(&'a K, bool);
impl<'a, K> Key<'a, K> {
fn prefix(key: &'a K) -> std::ops::Range<Self> {
Self(key, false)..Self(key, true)
}
}
impl<'a, K: Ord, V> Equivalent<Key<'a, K>> for KeyValue<K, V> {
fn equivalent(&self, key: &Key<'a, K>) -> bool {
self.0 == *key.0 && !key.1
}
}
impl<'a, K: Ord, V> Comparable<Key<'a, K>> for KeyValue<K, V> {
fn compare(&self, key: &Key<'a, K>) -> std::cmp::Ordering {
self.0.cmp(key.0).then(false.cmp(&key.1))
}
}
#[derive(Clone)]
struct Entry<T> {
project_id: Option<ProjectIdInt>,
account_id: Option<AccountIdInt>,
value: T,
}
impl<T> Entry<T> {
fn dec_ref_counts(
self,
project2ep: &RefCountMultiSet<ProjectIdInt, EndpointIdInt>,
account2ep: &RefCountMultiSet<AccountIdInt, EndpointIdInt>,
endpoint_id: EndpointIdInt,
) {
if let Some(project_id) = self.project_id {
dec_ref_count(project2ep, project_id, endpoint_id);
}
if let Some(account_id) = self.account_id {
dec_ref_count(account2ep, account_id, endpoint_id);
}
}
}
fn dec_ref_count<Id: Ord + Send + 'static>(
id2ep: &RefCountMultiSet<Id, EndpointIdInt>,
id: Id,
endpoint_id: EndpointIdInt,
) {
if let Some(entry) = id2ep.get(&KeyValue(id, endpoint_id)) {
let mut count = entry.value().lock_propagate_poison();
*count -= 1;
if *count == 0 {
// remove the entry while holding the lock
entry.remove();
}
}
}
impl ProjectInfoCache {
pub fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt) {
info!("invalidating endpoint access for `{endpoint_id}`");
@@ -41,25 +110,17 @@ impl ProjectInfoCache {
pub fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt) {
info!("invalidating endpoint access for project `{project_id}`");
let endpoints = self
.project2ep
.get(&project_id)
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
self.ep_controls.invalidate(&endpoint_id);
for entry in self.project2ep.range(Key::prefix(&project_id)) {
self.ep_controls.invalidate(&entry.key().1);
}
}
pub fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt) {
info!("invalidating endpoint access for org `{account_id}`");
let endpoints = self
.account2ep
.get(&account_id)
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
self.ep_controls.invalidate(&endpoint_id);
for entry in self.account2ep.range(Key::prefix(&account_id)) {
self.ep_controls.invalidate(&entry.key().1);
}
}
@@ -72,13 +133,9 @@ impl ProjectInfoCache {
"invalidating role secret for project_id `{}` and role_name `{}`",
project_id, role_name,
);
let endpoints = self
.project2ep
.get(&project_id)
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
self.role_controls.invalidate(&(endpoint_id, role_name));
for entry in self.project2ep.range(Key::prefix(&project_id)) {
self.role_controls.invalidate(&(entry.key().1, role_name));
}
}
}
@@ -94,29 +151,50 @@ impl ProjectInfoCache {
.capacity
.set(CacheKind::ProjectInfoEndpoints, config.size as i64);
let project2ep = Arc::new(RefCountMultiSet::<ProjectIdInt, EndpointIdInt>::new());
let account2ep = Arc::new(RefCountMultiSet::<AccountIdInt, EndpointIdInt>::new());
let project2ep1 = Arc::clone(&project2ep);
let project2ep2 = Arc::clone(&project2ep);
let account2ep1 = Arc::clone(&account2ep);
let account2ep2 = Arc::clone(&account2ep);
// we cache errors for 30 seconds, unless retry_at is set.
let expiry = CplaneExpiry::default();
Self {
role_controls: Cache::builder()
.name("project_info_roles")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoRoles, cause);
})
.name("role_access_controls")
.eviction_listener(
move |k, v: ControlPlaneResult<Entry<RoleAccessControl>>, cause| {
eviction_listener(CacheKind::ProjectInfoRoles, cause);
let (endpoint_id, _): (EndpointIdInt, RoleNameInt) = *k;
if let Ok(v) = v {
v.dec_ref_counts(&project2ep1, &account2ep1, endpoint_id);
}
},
)
.max_capacity(config.size * config.max_roles)
.time_to_live(config.ttl)
.expire_after(expiry)
.build(),
ep_controls: Cache::builder()
.name("project_info_endpoints")
.eviction_listener(|_k, _v, cause| {
eviction_listener(CacheKind::ProjectInfoEndpoints, cause);
})
.name("endpoint_access_controls")
.eviction_listener(
move |k, v: ControlPlaneResult<Entry<EndpointAccessControl>>, cause| {
eviction_listener(CacheKind::ProjectInfoEndpoints, cause);
let endpoint_id: EndpointIdInt = *k;
if let Ok(v) = v {
v.dec_ref_counts(&project2ep2, &account2ep2, endpoint_id);
}
},
)
.max_capacity(config.size)
.time_to_live(config.ttl)
.expire_after(expiry)
.build(),
project2ep: ClashMap::new(),
account2ep: ClashMap::new(),
project2ep,
account2ep,
config,
}
}
@@ -131,7 +209,9 @@ impl ProjectInfoCache {
count_cache_outcome(
CacheKind::ProjectInfoRoles,
self.role_controls.get(&(endpoint_id, role_name)),
self.role_controls
.get(&(endpoint_id, role_name))
.map(|e| e.map(|e| e.value)),
)
}
@@ -143,7 +223,9 @@ impl ProjectInfoCache {
count_cache_outcome(
CacheKind::ProjectInfoEndpoints,
self.ep_controls.get(&endpoint_id),
self.ep_controls
.get(&endpoint_id)
.map(|e| e.map(|e| e.value)),
)
}
@@ -156,11 +238,12 @@ impl ProjectInfoCache {
controls: EndpointAccessControl,
role_controls: RoleAccessControl,
) {
// 2 corresponds to how many cache inserts we do.
if let Some(account_id) = account_id {
self.insert_account2endpoint(account_id, endpoint_id);
self.inc_account2ep_ref(account_id, endpoint_id, 2);
}
if let Some(project_id) = project_id {
self.insert_project2endpoint(project_id, endpoint_id);
self.inc_project2ep_ref(project_id, endpoint_id, 2);
}
debug!(
@@ -171,9 +254,22 @@ impl ProjectInfoCache {
count_cache_insert(CacheKind::ProjectInfoEndpoints);
count_cache_insert(CacheKind::ProjectInfoRoles);
self.ep_controls.insert(endpoint_id, Ok(controls));
self.role_controls
.insert((endpoint_id, role_name), Ok(role_controls));
self.ep_controls.insert(
endpoint_id,
Ok(Entry {
account_id,
project_id,
value: controls,
}),
);
self.role_controls.insert(
(endpoint_id, role_name),
Ok(Entry {
account_id,
project_id,
value: role_controls,
}),
);
}
pub(crate) fn insert_endpoint_access_err(
@@ -211,22 +307,18 @@ impl ProjectInfoCache {
.insert((endpoint_id, role_name), Err(msg));
}
fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) {
endpoints.insert(endpoint_id);
} else {
self.project2ep
.insert(project_id, HashSet::from([endpoint_id]));
}
fn inc_project2ep_ref(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt, x: usize) {
let entry = self
.project2ep
.get_or_insert(KeyValue(project_id, endpoint_id), Mutex::new(0));
*entry.value().lock_propagate_poison() += x;
}
fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) {
if let Some(mut endpoints) = self.account2ep.get_mut(&account_id) {
endpoints.insert(endpoint_id);
} else {
self.account2ep
.insert(account_id, HashSet::from([endpoint_id]));
}
fn inc_account2ep_ref(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt, x: usize) {
let entry = self
.account2ep
.get_or_insert(KeyValue(account_id, endpoint_id), Mutex::new(0));
*entry.value().lock_propagate_poison() += x;
}
pub fn maybe_invalidate_role_secret(&self, _endpoint_id: &EndpointId, _role_name: &RoleName) {
@@ -291,6 +383,16 @@ mod tests {
},
);
cache.ep_controls.run_pending_tasks();
cache.role_controls.run_pending_tasks();
// check the project mappings are there
assert_eq!(cache.project2ep.len(), 1);
// check the ref counts
let entry = cache.project2ep.front().unwrap();
assert_eq!(*entry.value().lock_propagate_poison(), 2);
cache.insert_endpoint_access(
account_id,
project_id,
@@ -307,6 +409,17 @@ mod tests {
},
);
cache.ep_controls.run_pending_tasks();
cache.role_controls.run_pending_tasks();
// check the project mappings are still there
assert_eq!(cache.project2ep.len(), 1);
// check the ref counts
let entry = cache.project2ep.front().unwrap();
assert_eq!(*entry.value().lock_propagate_poison(), 3);
// check both entries exist
let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
assert_eq!(cached.unwrap().secret, secret1);
@@ -334,13 +447,26 @@ mod tests {
},
);
cache.ep_controls.run_pending_tasks();
cache.role_controls.run_pending_tasks();
assert_eq!(cache.role_controls.entry_count(), 2);
// check the project mappings are still there
assert_eq!(cache.project2ep.len(), 1);
// check the ref counts are unchanged.
let entry = cache.project2ep.front().unwrap();
assert_eq!(*entry.value().lock_propagate_poison(), 3);
tokio::time::sleep(Duration::from_secs(2)).await;
cache.ep_controls.run_pending_tasks();
cache.role_controls.run_pending_tasks();
assert_eq!(cache.role_controls.entry_count(), 0);
// check the project/account mappings are no longer there
assert!(cache.project2ep.is_empty());
}
#[tokio::test]

View File

@@ -18,7 +18,7 @@ pub struct StringInterner<Id> {
_id: PhantomData<Id>,
}
#[derive(PartialEq, Debug, Clone, Copy, Eq, Hash)]
#[derive(PartialEq, Debug, Clone, Copy, Eq, Hash, PartialOrd, Ord)]
pub struct InternedString<Id> {
inner: Spur,
_id: PhantomData<Id>,
@@ -146,7 +146,7 @@ impl From<&RoleName> for RoleNameInt {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EndpointIdTag;
impl InternId for EndpointIdTag {
fn get_interner() -> &'static StringInterner<Self> {
@@ -186,7 +186,7 @@ impl From<BranchId> for BranchIdInt {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProjectIdTag;
impl InternId for ProjectIdTag {
fn get_interner() -> &'static StringInterner<Self> {
@@ -206,7 +206,7 @@ impl From<ProjectId> for ProjectIdInt {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct AccountIdTag;
impl InternId for AccountIdTag {
fn get_interner() -> &'static StringInterner<Self> {

View File

@@ -143,7 +143,7 @@ pub struct ApiConfig {
pub(crate) struct DbSchemaCache(Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
impl DbSchemaCache {
pub fn new(config: crate::config::CacheOptions) -> Self {
let builder = Cache::builder().name("schema");
let builder = Cache::builder().name("db_schema_cache");
let builder = config.moka(builder);
let metrics = &Metrics::get().cache;

View File

@@ -9,10 +9,9 @@
```bash
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
export CLICKHOUSE_PASSWORD=ch_password123
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k 'test_clickhouse[release-pg17]'
./scripts/pytest -m remote_cluster -k test_clickhouse
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
```
@@ -22,6 +21,6 @@ docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k 'test_debezium[release-pg17]'
./scripts/pytest -m remote_cluster -k test_debezium
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml down
```

View File

@@ -1,11 +1,9 @@
services:
clickhouse:
image: clickhouse/clickhouse-server:25.6
image: clickhouse/clickhouse-server
user: "101:101"
container_name: clickhouse
hostname: clickhouse
environment:
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-ch_password123}
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000

View File

@@ -1,28 +1,18 @@
services:
zookeeper:
image: quay.io/debezium/zookeeper:3.1.3.Final
ports:
- 127.0.0.1:2181:2181
- 127.0.0.1:2888:2888
- 127.0.0.1:3888:3888
image: quay.io/debezium/zookeeper:2.7
kafka:
image: quay.io/debezium/kafka:3.1.3.Final
depends_on: [zookeeper]
image: quay.io/debezium/kafka:2.7
environment:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
- 29092:29092
- 127.0.0.1:9092:9092
debezium:
image: quay.io/debezium/connect:3.1.3.Final
depends_on: [kafka]
image: quay.io/debezium/connect:2.7
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1

View File

@@ -53,13 +53,8 @@ def test_clickhouse(remote_pg: RemotePostgres):
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
if "CLICKHOUSE_PASSWORD" not in os.environ:
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
client = clickhouse_connect.get_client(
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
)
client = clickhouse_connect.get_client(host=clickhouse_host)
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command("DROP DATABASE IF EXISTS db1_postgres")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "
f"MaterializedPostgreSQL('{conn_options['host']}', "

View File

@@ -17,7 +17,6 @@ from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
from kafka import KafkaConsumer
class DebeziumAPI:
@@ -102,13 +101,9 @@ def debezium(remote_pg: RemotePostgres):
assert len(dbz.list_connectors()) == 1
from kafka import KafkaConsumer
kafka_host = "kafka" if (os.getenv("CI", "false") == "true") else "127.0.0.1"
kafka_port = 9092 if (os.getenv("CI", "false") == "true") else 29092
log.info("Connecting to Kafka: %s:%s", kafka_host, kafka_port)
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=[f"{kafka_host}:{kafka_port}"],
bootstrap_servers=["kafka:9092"],
auto_offset_reset="earliest",
enable_auto_commit=False,
)
@@ -117,7 +112,7 @@ def debezium(remote_pg: RemotePostgres):
assert resp.status_code == 204
def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> None:
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
"""
Gets the message from Kafka and checks its validity
Arguments:
@@ -129,7 +124,6 @@ def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> No
after: a dictionary, if not None, the after field from the kafka message must
have the same values for the same keys
"""
log.info("Bootstrap servers: %s", consumer.config["bootstrap_servers"])
msg = consumer.poll()
assert msg, "Empty message"
for val in msg.values():