mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Disable logical replication subscribers (#10249)
Drop logical replication subscribers before compute starts on a non-main branch. Add new compute_ctl spec flag: drop_subscriptions_before_start If it is set, drop all the subscriptions from the compute node before it starts. To avoid race on compute start, use new GUC neon.disable_logical_replication_subscribers to temporarily disable logical replication workers until we drop the subscriptions. Ensure that we drop subscriptions exactly once when endpoint starts on a new branch. It is essential, because otherwise, we may drop not only inherited, but newly created subscriptions. We cannot rely only on spec.drop_subscriptions_before_start flag, because if for some reason compute restarts inside VM, it will start again with the same spec and flag value. To handle this, we save the fact of the operation in the database in the neon.drop_subscriptions_done table. If the table does not exist, we assume that the operation was never performed, so we must do it. If table exists, we check if the operation was performed on the current timeline. fixes: https://github.com/neondatabase/neon/issues/8790
This commit is contained in:
committed by
GitHub
parent
92d95b08cf
commit
8e8df1b453
@@ -41,14 +41,14 @@ use crate::local_proxy;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
use crate::spec_apply::ApplySpecPhase::{
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSuperUser,
|
||||
DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions,
|
||||
RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
|
||||
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
|
||||
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
|
||||
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
|
||||
RunInEachDatabase,
|
||||
};
|
||||
use crate::spec_apply::PerDatabasePhase;
|
||||
use crate::spec_apply::PerDatabasePhase::{
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropSubscriptionsForDeletedDatabases,
|
||||
HandleAnonExtension,
|
||||
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
|
||||
};
|
||||
use crate::spec_apply::{apply_operations, MutableApplyContext, DB};
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
@@ -340,6 +340,15 @@ impl ComputeNode {
|
||||
self.state.lock().unwrap().status
|
||||
}
|
||||
|
||||
pub fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.pspec
|
||||
.as_ref()
|
||||
.map(|s| s.timeline_id)
|
||||
}
|
||||
|
||||
// Remove `pgdata` directory and create it again with right permissions.
|
||||
fn create_pgdata(&self) -> Result<()> {
|
||||
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
|
||||
@@ -929,6 +938,48 @@ impl ComputeNode {
|
||||
.map(|role| (role.name.clone(), role))
|
||||
.collect::<HashMap<String, Role>>();
|
||||
|
||||
// Check if we need to drop subscriptions before starting the endpoint.
|
||||
//
|
||||
// It is important to do this operation exactly once when endpoint starts on a new branch.
|
||||
// Otherwise, we may drop not inherited, but newly created subscriptions.
|
||||
//
|
||||
// We cannot rely only on spec.drop_subscriptions_before_start flag,
|
||||
// because if for some reason compute restarts inside VM,
|
||||
// it will start again with the same spec and flag value.
|
||||
//
|
||||
// To handle this, we save the fact of the operation in the database
|
||||
// in the neon.drop_subscriptions_done table.
|
||||
// If the table does not exist, we assume that the operation was never performed, so we must do it.
|
||||
// If table exists, we check if the operation was performed on the current timelilne.
|
||||
//
|
||||
let mut drop_subscriptions_done = false;
|
||||
|
||||
if spec.drop_subscriptions_before_start {
|
||||
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
|
||||
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
|
||||
|
||||
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
|
||||
|
||||
drop_subscriptions_done = match
|
||||
client.simple_query(&query).await {
|
||||
Ok(result) => {
|
||||
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
|
||||
},
|
||||
Err(e) =>
|
||||
{
|
||||
match e.code() {
|
||||
Some(&SqlState::UNDEFINED_TABLE) => false,
|
||||
_ => {
|
||||
// We don't expect any other error here, except for the schema/table not existing
|
||||
error!("Error checking if drop subscription operation was already performed: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
let jwks_roles = Arc::new(
|
||||
spec.as_ref()
|
||||
.local_proxy_config
|
||||
@@ -996,7 +1047,7 @@ impl ComputeNode {
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
[DropSubscriptionsForDeletedDatabases].to_vec(),
|
||||
[DropLogicalSubscriptions].to_vec(),
|
||||
);
|
||||
|
||||
Ok(spawn(fut))
|
||||
@@ -1024,6 +1075,7 @@ impl ComputeNode {
|
||||
CreateAndAlterRoles,
|
||||
RenameAndDeleteDatabases,
|
||||
CreateAndAlterDatabases,
|
||||
CreateSchemaNeon,
|
||||
] {
|
||||
info!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
@@ -1064,6 +1116,17 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let mut phases = vec![
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
];
|
||||
|
||||
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
|
||||
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
|
||||
phases.push(DropLogicalSubscriptions);
|
||||
}
|
||||
|
||||
let fut = Self::apply_spec_sql_db(
|
||||
spec.clone(),
|
||||
conf,
|
||||
@@ -1071,12 +1134,7 @@ impl ComputeNode {
|
||||
jwks_roles.clone(),
|
||||
concurrency_token.clone(),
|
||||
db,
|
||||
[
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
]
|
||||
.to_vec(),
|
||||
phases,
|
||||
);
|
||||
|
||||
Ok(spawn(fut))
|
||||
@@ -1088,12 +1146,20 @@ impl ComputeNode {
|
||||
handle.await??;
|
||||
}
|
||||
|
||||
for phase in vec![
|
||||
let mut phases = vec![
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension,
|
||||
HandleNeonExtension, // This step depends on CreateSchemaNeon
|
||||
CreateAvailabilityCheck,
|
||||
DropRoles,
|
||||
] {
|
||||
];
|
||||
|
||||
// This step depends on CreateSchemaNeon
|
||||
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
|
||||
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
|
||||
phases.push(FinalizeDropLogicalSubscriptions);
|
||||
}
|
||||
|
||||
for phase in phases {
|
||||
debug!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
@@ -1463,6 +1529,14 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
|
||||
if config::line_in_file(
|
||||
&postgresql_conf_path,
|
||||
"neon.disable_logical_replication_subscribers=false",
|
||||
)? {
|
||||
info!("updated postgresql.conf to set neon.disable_logical_replication_subscribers=false");
|
||||
}
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
|
||||
@@ -129,6 +129,13 @@ pub fn write_postgres_conf(
|
||||
|
||||
writeln!(file, "neon.extension_server_port={}", extension_server_port)?;
|
||||
|
||||
if spec.drop_subscriptions_before_start {
|
||||
writeln!(file, "neon.disable_logical_replication_subscribers=true")?;
|
||||
} else {
|
||||
// be explicit about the default value
|
||||
writeln!(file, "neon.disable_logical_replication_subscribers=false")?;
|
||||
}
|
||||
|
||||
// This is essential to keep this line at the end of the file,
|
||||
// because it is intended to override any settings above.
|
||||
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;
|
||||
|
||||
@@ -47,7 +47,7 @@ pub enum PerDatabasePhase {
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
DropSubscriptionsForDeletedDatabases,
|
||||
DropLogicalSubscriptions,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -58,11 +58,13 @@ pub enum ApplySpecPhase {
|
||||
CreateAndAlterRoles,
|
||||
RenameAndDeleteDatabases,
|
||||
CreateAndAlterDatabases,
|
||||
CreateSchemaNeon,
|
||||
RunInEachDatabase { db: DB, subphase: PerDatabasePhase },
|
||||
HandleOtherExtensions,
|
||||
HandleNeonExtension,
|
||||
CreateAvailabilityCheck,
|
||||
DropRoles,
|
||||
FinalizeDropLogicalSubscriptions,
|
||||
}
|
||||
|
||||
pub struct Operation {
|
||||
@@ -331,7 +333,7 @@ async fn get_operations<'a>(
|
||||
// NB: there could be other db states, which prevent us from dropping
|
||||
// the database. For example, if db is used by any active subscription
|
||||
// or replication slot.
|
||||
// Such cases are handled in the DropSubscriptionsForDeletedDatabases
|
||||
// Such cases are handled in the DropLogicalSubscriptions
|
||||
// phase. We do all the cleanup before actually dropping the database.
|
||||
let drop_db_query: String = format!(
|
||||
"DROP DATABASE IF EXISTS {} WITH (FORCE)",
|
||||
@@ -442,13 +444,19 @@ async fn get_operations<'a>(
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
ApplySpecPhase::CreateSchemaNeon => Ok(Box::new(once(Operation {
|
||||
query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
|
||||
comment: Some(String::from(
|
||||
"create schema for neon extension and utils tables",
|
||||
)),
|
||||
}))),
|
||||
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
|
||||
match subphase {
|
||||
PerDatabasePhase::DropSubscriptionsForDeletedDatabases => {
|
||||
PerDatabasePhase::DropLogicalSubscriptions => {
|
||||
match &db {
|
||||
DB::UserDB(db) => {
|
||||
let drop_subscription_query: String = format!(
|
||||
include_str!("sql/drop_subscription_for_drop_dbs.sql"),
|
||||
include_str!("sql/drop_subscriptions.sql"),
|
||||
datname_str = escape_literal(&db.name),
|
||||
);
|
||||
|
||||
@@ -666,10 +674,6 @@ async fn get_operations<'a>(
|
||||
}
|
||||
ApplySpecPhase::HandleNeonExtension => {
|
||||
let operations = vec![
|
||||
Operation {
|
||||
query: String::from("CREATE SCHEMA IF NOT EXISTS neon"),
|
||||
comment: Some(String::from("init: add schema for extension")),
|
||||
},
|
||||
Operation {
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon"),
|
||||
comment: Some(String::from(
|
||||
@@ -712,5 +716,9 @@ async fn get_operations<'a>(
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
|
||||
query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
|
||||
comment: None,
|
||||
}))),
|
||||
}
|
||||
}
|
||||
|
||||
21
compute_tools/src/sql/finalize_drop_subscriptions.sql
Normal file
21
compute_tools/src/sql/finalize_drop_subscriptions.sql
Normal file
@@ -0,0 +1,21 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_tables
|
||||
WHERE tablename = 'drop_subscriptions_done'
|
||||
AND schemaname = 'neon'
|
||||
)
|
||||
THEN
|
||||
CREATE TABLE neon.drop_subscriptions_done
|
||||
(id serial primary key, timeline_id text);
|
||||
END IF;
|
||||
|
||||
-- preserve the timeline_id of the last drop_subscriptions run
|
||||
-- to ensure that the cleanup of a timeline is executed only once.
|
||||
-- use upsert to avoid the table bloat in case of cascade branching (branch of a branch)
|
||||
INSERT INTO neon.drop_subscriptions_done VALUES (1, current_setting('neon.timeline_id'))
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET timeline_id = current_setting('neon.timeline_id');
|
||||
END
|
||||
$$
|
||||
@@ -1357,6 +1357,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
args.pg_version,
|
||||
mode,
|
||||
!args.update_catalog,
|
||||
false,
|
||||
)?;
|
||||
}
|
||||
EndpointCmd::Start(args) => {
|
||||
|
||||
@@ -76,6 +76,7 @@ pub struct EndpointConf {
|
||||
http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
drop_subscriptions_before_start: bool,
|
||||
features: Vec<ComputeFeature>,
|
||||
}
|
||||
|
||||
@@ -143,6 +144,7 @@ impl ComputeControlPlane {
|
||||
pg_version: u32,
|
||||
mode: ComputeMode,
|
||||
skip_pg_catalog_updates: bool,
|
||||
drop_subscriptions_before_start: bool,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
|
||||
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
|
||||
@@ -162,6 +164,7 @@ impl ComputeControlPlane {
|
||||
// with this we basically test a case of waking up an idle compute, where
|
||||
// we also skip catalog updates in the cloud.
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
features: vec![],
|
||||
});
|
||||
|
||||
@@ -177,6 +180,7 @@ impl ComputeControlPlane {
|
||||
pg_port,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
features: vec![],
|
||||
})?,
|
||||
)?;
|
||||
@@ -240,6 +244,7 @@ pub struct Endpoint {
|
||||
// Optimizations
|
||||
skip_pg_catalog_updates: bool,
|
||||
|
||||
drop_subscriptions_before_start: bool,
|
||||
// Feature flags
|
||||
features: Vec<ComputeFeature>,
|
||||
}
|
||||
@@ -291,6 +296,7 @@ impl Endpoint {
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
|
||||
features: conf.features,
|
||||
})
|
||||
}
|
||||
@@ -625,6 +631,7 @@ impl Endpoint {
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
reconfigure_concurrency: 1,
|
||||
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
@@ -138,6 +138,13 @@ pub struct ComputeSpec {
|
||||
/// enough spare connections for reconfiguration process to succeed.
|
||||
#[serde(default = "default_reconfigure_concurrency")]
|
||||
pub reconfigure_concurrency: usize,
|
||||
|
||||
/// If set to true, the compute_ctl will drop all subscriptions before starting the
|
||||
/// compute. This is needed when we start an endpoint on a branch, so that child
|
||||
/// would not compete with parent branch subscriptions
|
||||
/// over the same replication content from publisher.
|
||||
#[serde(default)] // Default false
|
||||
pub drop_subscriptions_before_start: bool,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "access/xlogrecovery.h"
|
||||
#endif
|
||||
#include "replication/logical.h"
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/proc.h"
|
||||
@@ -434,6 +435,15 @@ _PG_init(void)
|
||||
|
||||
restore_running_xacts_callback = RestoreRunningXactsFromClog;
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"neon.disable_logical_replication_subscribers",
|
||||
"Disables incomming logical replication",
|
||||
NULL,
|
||||
&disable_logical_replication_subscribers,
|
||||
false,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"neon.allow_replica_misconfig",
|
||||
|
||||
@@ -523,6 +523,7 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
remote_ext_config: str | None = None,
|
||||
pageserver_id: int | None = None,
|
||||
allow_multiple: bool = False,
|
||||
create_test_user: bool = False,
|
||||
basebackup_request_tries: int | None = None,
|
||||
env: dict[str, str] | None = None,
|
||||
) -> subprocess.CompletedProcess[str]:
|
||||
@@ -544,6 +545,8 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
if allow_multiple:
|
||||
args.extend(["--allow-multiple"])
|
||||
if create_test_user:
|
||||
args.extend(["--create-test-user"])
|
||||
|
||||
res = self.raw_cli(args, extra_env_vars)
|
||||
res.check_returncode()
|
||||
|
||||
@@ -3918,6 +3918,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
pageserver_id: int | None = None,
|
||||
safekeepers: list[int] | None = None,
|
||||
allow_multiple: bool = False,
|
||||
create_test_user: bool = False,
|
||||
basebackup_request_tries: int | None = None,
|
||||
env: dict[str, str] | None = None,
|
||||
) -> Self:
|
||||
@@ -3939,6 +3940,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
create_test_user=create_test_user,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
env=env,
|
||||
)
|
||||
|
||||
242
test_runner/regress/test_subscriber_branching.py
Normal file
242
test_runner/regress/test_subscriber_branching.py
Normal file
@@ -0,0 +1,242 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
# This test checks that branching of timeline with logical subscriptions
|
||||
# does not affect logical replication for parent.
|
||||
# Endpoint on a new branch will drop all existing subscriptions at the start,
|
||||
# so it will not receive any changes.
|
||||
# If needed, user can create new subscriptions on the child branch.
|
||||
def test_subscriber_branching(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.create_branch("publisher")
|
||||
pub = env.endpoints.create("publisher")
|
||||
pub.respec(
|
||||
skip_pg_catalog_updates=False,
|
||||
create_test_user=True,
|
||||
)
|
||||
pub.start(create_test_user=True)
|
||||
|
||||
env.create_branch("subscriber")
|
||||
sub = env.endpoints.create("subscriber")
|
||||
# Pass create_test_user flag to get properly filled spec.users and spec.databases fields.
|
||||
#
|
||||
# This test checks the per-database operations that happen at compute start
|
||||
# and these operations are applied to the databases that are present in the spec.
|
||||
sub.respec(
|
||||
skip_pg_catalog_updates=False,
|
||||
create_test_user=True,
|
||||
)
|
||||
sub.start(create_test_user=True)
|
||||
|
||||
pub.wait_for_migrations()
|
||||
sub.wait_for_migrations()
|
||||
|
||||
n_records = 1000
|
||||
|
||||
def check_that_changes_propagated():
|
||||
scur.execute("SELECT count(*) FROM t")
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == n_records
|
||||
|
||||
def insert_data(pub, start):
|
||||
with pub.cursor(dbname="neondb", user="test", password="pubtestpwd") as pcur:
|
||||
for i in range(start, start + n_records):
|
||||
pcur.execute("INSERT into t values (%s,random()*100000)", (i,))
|
||||
|
||||
# create_test_user creates a user without password
|
||||
# but psycopg2 execute() requires a password
|
||||
with sub.cursor() as scur:
|
||||
scur.execute("ALTER USER test WITH PASSWORD 'testpwd'")
|
||||
with pub.cursor() as pcur:
|
||||
# Create a test user to avoid using superuser
|
||||
pcur.execute("ALTER USER test WITH PASSWORD 'pubtestpwd'")
|
||||
# If we don't do this, creating the subscription will fail
|
||||
pub.edit_hba(["host all test 0.0.0.0/0 md5"])
|
||||
|
||||
with pub.cursor(dbname="neondb", user="test", password="pubtestpwd") as pcur:
|
||||
pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
|
||||
pcur.execute("CREATE PUBLICATION pub FOR TABLE t")
|
||||
|
||||
with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
scur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
|
||||
pub_conn = (
|
||||
f"host=localhost port={pub.pg_port} dbname=neondb user=test password=pubtestpwd"
|
||||
)
|
||||
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub"
|
||||
scur.execute(query)
|
||||
time.sleep(2) # let initial table sync complete
|
||||
|
||||
insert_data(pub, 0)
|
||||
|
||||
with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
wait_until(check_that_changes_propagated)
|
||||
latest_end_lsn = query_scalar(
|
||||
scur, "select latest_end_lsn from pg_catalog.pg_stat_subscription; "
|
||||
)
|
||||
last_insert_lsn = query_scalar(scur, "select pg_current_wal_insert_lsn();")
|
||||
|
||||
log.info(f"latest_end_lsn = {latest_end_lsn}")
|
||||
log.info(f"last_insert_lsn = {last_insert_lsn}")
|
||||
|
||||
# stop the parent subscriber so that it doesn't interfere with the test
|
||||
sub.stop()
|
||||
|
||||
# 1. good scenario:
|
||||
# create subscriber_child_1
|
||||
# it will not get changes from publisher, because drop_subscriptions_before_start is set to True
|
||||
sub_child_1_timeline_id = env.create_branch(
|
||||
"subscriber_child_1",
|
||||
ancestor_branch_name="subscriber",
|
||||
ancestor_start_lsn=last_insert_lsn,
|
||||
)
|
||||
sub_child_1 = env.endpoints.create("subscriber_child_1")
|
||||
# Pass drop_subscriptions_before_start flag
|
||||
sub_child_1.respec(
|
||||
skip_pg_catalog_updates=False,
|
||||
create_test_user=True,
|
||||
drop_subscriptions_before_start=True,
|
||||
)
|
||||
sub_child_1.start(create_test_user=True)
|
||||
|
||||
# ensure that subscriber_child_1 sees all the data
|
||||
with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
scur.execute("SELECT count(*) FROM t")
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == n_records
|
||||
|
||||
# ensure that there are no subscriptions in this database
|
||||
scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub'")
|
||||
assert len(scur.fetchall()) == 0
|
||||
|
||||
# ensure that drop_subscriptions_done happened on this timeline
|
||||
with sub_child_1.cursor() as scur_postgres:
|
||||
scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done")
|
||||
res = scur_postgres.fetchall()
|
||||
assert len(res) == 1
|
||||
assert str(sub_child_1_timeline_id) == res[0][0]
|
||||
|
||||
old_n_records = n_records
|
||||
# insert more data on publisher
|
||||
insert_data(pub, n_records)
|
||||
n_records += n_records
|
||||
|
||||
pcur.execute("SELECT count(*) FROM t")
|
||||
res = pcur.fetchall()
|
||||
assert res[0][0] == n_records
|
||||
|
||||
# ensure that subscriber_child_1 doesn't see the new data
|
||||
with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
scur.execute("SELECT count(*) FROM t")
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == old_n_records
|
||||
|
||||
# reenable logical replication on subscriber_child_1
|
||||
# using new publication
|
||||
# ensure that new publication works as expected
|
||||
with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
scur.execute("TRUNCATE t")
|
||||
|
||||
# create new subscription
|
||||
# with new pub name
|
||||
pcur.execute("CREATE PUBLICATION pub_new FOR TABLE t")
|
||||
query = f"CREATE SUBSCRIPTION sub_new CONNECTION '{pub_conn}' PUBLICATION pub_new"
|
||||
scur.execute(query)
|
||||
|
||||
wait_until(check_that_changes_propagated)
|
||||
|
||||
scur.execute("SELECT count(*) FROM t")
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == n_records
|
||||
|
||||
# ensure that new publication works as expected after compute restart
|
||||
# first restart with drop_subscriptions_before_start=True
|
||||
# to emulate the case when compute restarts within the VM with stale spec
|
||||
sub_child_1.stop()
|
||||
sub_child_1.respec(
|
||||
skip_pg_catalog_updates=False,
|
||||
create_test_user=True,
|
||||
drop_subscriptions_before_start=True,
|
||||
)
|
||||
sub_child_1.start(create_test_user=True)
|
||||
|
||||
with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
# ensure that even though the flag is set, we didn't drop new subscription
|
||||
scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'")
|
||||
assert len(scur.fetchall()) == 1
|
||||
|
||||
# ensure that drop_subscriptions_done happened on this timeline
|
||||
with sub_child_1.cursor() as scur_postgres:
|
||||
scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done")
|
||||
res = scur_postgres.fetchall()
|
||||
assert len(res) == 1
|
||||
assert str(sub_child_1_timeline_id) == res[0][0]
|
||||
|
||||
sub_child_1.stop()
|
||||
sub_child_1.respec(
|
||||
skip_pg_catalog_updates=False,
|
||||
create_test_user=True,
|
||||
drop_subscriptions_before_start=False,
|
||||
)
|
||||
sub_child_1.start(create_test_user=True)
|
||||
|
||||
# insert more data on publisher
|
||||
insert_data(pub, n_records)
|
||||
n_records += n_records
|
||||
with sub_child_1.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
# ensure that there is a subscriptions in this database
|
||||
scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'")
|
||||
assert len(scur.fetchall()) == 1
|
||||
|
||||
wait_until(check_that_changes_propagated)
|
||||
scur.execute("SELECT count(*) FROM t")
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == n_records
|
||||
|
||||
# ensure that drop_subscriptions_done happened on this timeline
|
||||
with sub_child_1.cursor() as scur_postgres:
|
||||
scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done")
|
||||
res = scur_postgres.fetchall()
|
||||
assert len(res) == 1
|
||||
assert str(sub_child_1_timeline_id) == res[0][0]
|
||||
|
||||
# wake the sub and ensure that it catches up with the new data
|
||||
sub.start(create_test_user=True)
|
||||
with sub.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
logical_replication_sync(sub, pub)
|
||||
wait_until(check_that_changes_propagated)
|
||||
scur.execute("SELECT count(*) FROM t")
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == n_records
|
||||
|
||||
# test that we can create a branch of a branch
|
||||
sub_child_2_timeline_id = env.create_branch(
|
||||
"subscriber_child_2",
|
||||
ancestor_branch_name="subscriber_child_1",
|
||||
)
|
||||
sub_child_2 = env.endpoints.create("subscriber_child_2")
|
||||
# Pass drop_subscriptions_before_start flag
|
||||
sub_child_2.respec(
|
||||
skip_pg_catalog_updates=False,
|
||||
drop_subscriptions_before_start=True,
|
||||
)
|
||||
sub_child_2.start(create_test_user=True)
|
||||
|
||||
# ensure that subscriber_child_2 does not inherit subscription from child_1
|
||||
with sub_child_2.cursor(dbname="neondb", user="test", password="testpwd") as scur:
|
||||
# ensure that there are no subscriptions in this database
|
||||
scur.execute("SELECT count(*) FROM pg_catalog.pg_subscription")
|
||||
res = scur.fetchall()
|
||||
assert res[0][0] == 0
|
||||
|
||||
# ensure that drop_subscriptions_done happened on this timeline
|
||||
with sub_child_2.cursor() as scur_postgres:
|
||||
scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done")
|
||||
res = scur_postgres.fetchall()
|
||||
assert len(res) == 1
|
||||
assert str(sub_child_2_timeline_id) == res[0][0]
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 46082f2088...5f3b3afdd7
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: dd0b28d6fb...935292e883
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: d674efd776...061d563779
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: a8dd6e779d...4276717f6e
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.2",
|
||||
"a8dd6e779dde907778006adb436b557ad652fb97"
|
||||
"4276717f6e91023e504de355f4f21d4824074de8"
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
"d674efd776f59d78e8fa1535bd2f95c3e6984fca"
|
||||
"061d56377961ba56998e41b7d5d5e975919ad301"
|
||||
],
|
||||
"v15": [
|
||||
"15.10",
|
||||
"dd0b28d6fbad39e227f3b77296fcca879af8b3a9"
|
||||
"935292e883298187f112db6e9c7f765037ddcf64"
|
||||
],
|
||||
"v14": [
|
||||
"14.15",
|
||||
"46082f20884f087a2d974b33ac65d63af26142bd"
|
||||
"5f3b3afdd7c24b4a0fd63ecb3288fab472fcc633"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user