mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 08:09:58 +00:00
Fix deadlock in drop_subscriptions_before_start (#10806)
ALTER SUBSCRIPTION requires AccessExclusive lock which conflicts with iteration over pg_subscription when multiple databases are present and operations are applied concurrently. Fix by explicitly locking pg_subscription in the beginning of the transaction in each database. ## Problem https://github.com/neondatabase/cloud/issues/24292
This commit is contained in:
committed by
GitHub
parent
07bee60037
commit
7c7180a79d
@@ -2,6 +2,7 @@ DO $$
|
||||
DECLARE
|
||||
subname TEXT;
|
||||
BEGIN
|
||||
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
|
||||
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
|
||||
|
||||
@@ -59,6 +59,7 @@ use nix::sys::signal::Signal;
|
||||
use pageserver_api::shard::ShardStripeSize;
|
||||
use reqwest::header::CONTENT_TYPE;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::debug;
|
||||
use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -81,8 +82,10 @@ pub struct EndpointConf {
|
||||
internal_http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
reconfigure_concurrency: usize,
|
||||
drop_subscriptions_before_start: bool,
|
||||
features: Vec<ComputeFeature>,
|
||||
cluster: Option<Cluster>,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -179,7 +182,9 @@ impl ComputeControlPlane {
|
||||
// we also skip catalog updates in the cloud.
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
cluster: None,
|
||||
});
|
||||
|
||||
ep.create_endpoint_dir()?;
|
||||
@@ -196,7 +201,9 @@ impl ComputeControlPlane {
|
||||
pg_version,
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
cluster: None,
|
||||
})?,
|
||||
)?;
|
||||
std::fs::write(
|
||||
@@ -261,8 +268,11 @@ pub struct Endpoint {
|
||||
skip_pg_catalog_updates: bool,
|
||||
|
||||
drop_subscriptions_before_start: bool,
|
||||
reconfigure_concurrency: usize,
|
||||
// Feature flags
|
||||
features: Vec<ComputeFeature>,
|
||||
// Cluster settings
|
||||
cluster: Option<Cluster>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq)]
|
||||
@@ -302,6 +312,8 @@ impl Endpoint {
|
||||
let conf: EndpointConf =
|
||||
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
|
||||
|
||||
debug!("serialized endpoint conf: {:?}", conf);
|
||||
|
||||
Ok(Endpoint {
|
||||
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
|
||||
external_http_address: SocketAddr::new(
|
||||
@@ -319,8 +331,10 @@ impl Endpoint {
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
|
||||
reconfigure_concurrency: conf.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
|
||||
features: conf.features,
|
||||
cluster: conf.cluster,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -607,7 +621,7 @@ impl Endpoint {
|
||||
};
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
let mut spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
@@ -640,7 +654,7 @@ impl Endpoint {
|
||||
Vec::new()
|
||||
},
|
||||
settings: None,
|
||||
postgresql_conf: Some(postgresql_conf),
|
||||
postgresql_conf: Some(postgresql_conf.clone()),
|
||||
},
|
||||
delta_operations: None,
|
||||
tenant_id: Some(self.tenant_id),
|
||||
@@ -653,9 +667,35 @@ impl Endpoint {
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: 1,
|
||||
reconfigure_concurrency: self.reconfigure_concurrency,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
};
|
||||
|
||||
// this strange code is needed to support respec() in tests
|
||||
if self.cluster.is_some() {
|
||||
debug!("Cluster is already set in the endpoint spec, using it");
|
||||
spec.cluster = self.cluster.clone().unwrap();
|
||||
|
||||
debug!("spec.cluster {:?}", spec.cluster);
|
||||
|
||||
// fill missing fields again
|
||||
if create_test_user {
|
||||
spec.cluster.roles.push(Role {
|
||||
name: PgIdent::from_str("test").unwrap(),
|
||||
encrypted_password: None,
|
||||
options: None,
|
||||
});
|
||||
spec.cluster.databases.push(Database {
|
||||
name: PgIdent::from_str("neondb").unwrap(),
|
||||
owner: PgIdent::from_str("test").unwrap(),
|
||||
options: None,
|
||||
restrict_conn: false,
|
||||
invalid: false,
|
||||
});
|
||||
}
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
}
|
||||
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
|
||||
@@ -252,7 +252,7 @@ pub enum ComputeMode {
|
||||
Replica,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Cluster {
|
||||
pub cluster_id: Option<String>,
|
||||
pub name: Option<String>,
|
||||
@@ -283,7 +283,7 @@ pub struct DeltaOp {
|
||||
|
||||
/// Rust representation of Postgres role info with only those fields
|
||||
/// that matter for us.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Role {
|
||||
pub name: PgIdent,
|
||||
pub encrypted_password: Option<String>,
|
||||
@@ -292,7 +292,7 @@ pub struct Role {
|
||||
|
||||
/// Rust representation of Postgres database info with only those fields
|
||||
/// that matter for us.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Database {
|
||||
pub name: PgIdent,
|
||||
pub owner: PgIdent,
|
||||
@@ -308,7 +308,7 @@ pub struct Database {
|
||||
/// Common type representing both SQL statement params with or without value,
|
||||
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
|
||||
/// options like `wal_level = logical`.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct GenericOption {
|
||||
pub name: String,
|
||||
pub value: Option<String>,
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
@@ -239,3 +240,173 @@ def test_subscriber_branching(neon_simple_env: NeonEnv):
|
||||
res = scur_postgres.fetchall()
|
||||
assert len(res) == 1
|
||||
assert str(sub_child_2_timeline_id) == res[0][0]
|
||||
|
||||
|
||||
def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can handle concurrent deletion of subscriptions in a multiple databases
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
NUMBER_OF_DBS = 5
|
||||
|
||||
# Create and start endpoint so that neon_local put all the generated
|
||||
# stuff into the spec.json file.
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"max_replication_slots = 10",
|
||||
"max_logical_replication_workers=10",
|
||||
"max_worker_processes=10",
|
||||
],
|
||||
)
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
{
|
||||
"name": "neondb",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
{
|
||||
"name": "publisher_db",
|
||||
"owner": "cloud_admin",
|
||||
},
|
||||
]
|
||||
|
||||
for i in range(NUMBER_OF_DBS):
|
||||
TEST_DB_NAMES.append(
|
||||
{
|
||||
"name": f"db{i}",
|
||||
"owner": "cloud_admin",
|
||||
}
|
||||
)
|
||||
|
||||
# Update the spec.json file to create the databases
|
||||
# and reconfigure the endpoint to apply the changes.
|
||||
endpoint.respec_deep(
|
||||
**{
|
||||
"skip_pg_catalog_updates": False,
|
||||
"cluster": {
|
||||
"databases": TEST_DB_NAMES,
|
||||
},
|
||||
}
|
||||
)
|
||||
endpoint.reconfigure()
|
||||
|
||||
connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''")
|
||||
|
||||
# create table, replication and subscription for each of the databases
|
||||
with endpoint.cursor(dbname="publisher_db") as publisher_cursor:
|
||||
for i in range(NUMBER_OF_DBS):
|
||||
publisher_cursor.execute(f"CREATE TABLE t{i}(a int)")
|
||||
publisher_cursor.execute(f"CREATE PUBLICATION mypub{i} FOR TABLE t{i}")
|
||||
publisher_cursor.execute(
|
||||
f"select pg_catalog.pg_create_logical_replication_slot('mysub{i}', 'pgoutput');"
|
||||
)
|
||||
publisher_cursor.execute(f"INSERT INTO t{i} VALUES ({i})")
|
||||
|
||||
with endpoint.cursor(dbname=f"db{i}") as cursor:
|
||||
cursor.execute(f"CREATE TABLE t{i}(a int)")
|
||||
cursor.execute(
|
||||
f"CREATE SUBSCRIPTION mysub{i} CONNECTION '{connstr}' PUBLICATION mypub{i} WITH (create_slot = false) "
|
||||
)
|
||||
|
||||
# wait for the subscription to be active
|
||||
for i in range(NUMBER_OF_DBS):
|
||||
logical_replication_sync(
|
||||
endpoint,
|
||||
endpoint,
|
||||
f"mysub{i}",
|
||||
sub_dbname=f"db{i}",
|
||||
pub_dbname="publisher_db",
|
||||
)
|
||||
|
||||
# Check that replication is working
|
||||
for i in range(NUMBER_OF_DBS):
|
||||
with endpoint.cursor(dbname=f"db{i}") as cursor:
|
||||
cursor.execute(f"SELECT * FROM t{i}")
|
||||
rows = cursor.fetchall()
|
||||
assert len(rows) == 1
|
||||
assert rows[0][0] == i
|
||||
|
||||
last_insert_lsn = query_scalar(cursor, "select pg_current_wal_insert_lsn();")
|
||||
|
||||
def start_publisher_workload(table_num: int, duration: int):
|
||||
start = time.time()
|
||||
with endpoint.cursor(dbname="publisher_db") as cur:
|
||||
while time.time() - start < duration:
|
||||
cur.execute(f"INSERT INTO t{i} SELECT FROM generate_series(1,1000)")
|
||||
|
||||
LOAD_DURATION = 5
|
||||
threads = [
|
||||
threading.Thread(target=start_publisher_workload, args=(i, LOAD_DURATION))
|
||||
for i in range(NUMBER_OF_DBS)
|
||||
]
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
sub_child_1_timeline_id = env.create_branch(
|
||||
"subscriber_child_1",
|
||||
ancestor_branch_name="main",
|
||||
ancestor_start_lsn=last_insert_lsn,
|
||||
)
|
||||
|
||||
sub_child_1 = env.endpoints.create("subscriber_child_1")
|
||||
|
||||
sub_child_1.respec(
|
||||
skip_pg_catalog_updates=False,
|
||||
reconfigure_concurrency=5,
|
||||
drop_subscriptions_before_start=True,
|
||||
cluster={
|
||||
"databases": TEST_DB_NAMES,
|
||||
"roles": [],
|
||||
},
|
||||
)
|
||||
|
||||
sub_child_1.start()
|
||||
|
||||
# ensure that subscription deletion happened on this timeline
|
||||
with sub_child_1.cursor() as scur_postgres:
|
||||
scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done")
|
||||
res = scur_postgres.fetchall()
|
||||
log.info(f"res = {res}")
|
||||
assert len(res) == 1
|
||||
assert str(sub_child_1_timeline_id) == res[0][0]
|
||||
|
||||
# ensure that there are no subscriptions in the databases
|
||||
for i in range(NUMBER_OF_DBS):
|
||||
with sub_child_1.cursor(dbname=f"db{i}") as cursor:
|
||||
cursor.execute("SELECT * FROM pg_catalog.pg_subscription")
|
||||
res = cursor.fetchall()
|
||||
assert len(res) == 0
|
||||
|
||||
# ensure that there are no unexpected rows in the tables
|
||||
cursor.execute(f"SELECT * FROM t{i}")
|
||||
rows = cursor.fetchall()
|
||||
assert len(rows) == 1
|
||||
assert rows[0][0] == i
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# ensure that logical replication is still working in main endpoint
|
||||
# wait for it to catch up
|
||||
for i in range(NUMBER_OF_DBS):
|
||||
logical_replication_sync(
|
||||
endpoint,
|
||||
endpoint,
|
||||
f"mysub{i}",
|
||||
sub_dbname=f"db{i}",
|
||||
pub_dbname="publisher_db",
|
||||
)
|
||||
|
||||
# verify that the data is the same in publisher and subscriber tables
|
||||
with endpoint.cursor(dbname="publisher_db") as publisher_cursor:
|
||||
for i in range(NUMBER_OF_DBS):
|
||||
with endpoint.cursor(dbname=f"db{i}") as cursor:
|
||||
publisher_cursor.execute(f"SELECT count(*) FROM t{i}")
|
||||
cursor.execute(f"SELECT count(*) FROM t{i}")
|
||||
pub_res = publisher_cursor.fetchone()
|
||||
sub_res = cursor.fetchone()
|
||||
log.info(f"for table t{i}: pub_res = {pub_res}, sub_res = {sub_res}")
|
||||
assert pub_res == sub_res
|
||||
|
||||
Reference in New Issue
Block a user