Compare commits

..

1 Commits

Author SHA1 Message Date
Arseny Sher
5fbaa3fd78 Remove LR files from basebackup when preparing standby.
Standby can't modify them as it doesn't write WAL.
2024-03-14 17:50:37 +03:00
3 changed files with 30 additions and 19 deletions

View File

@@ -683,6 +683,7 @@ impl ComputeNode {
ComputeMode::Primary => {}
ComputeMode::Replica | ComputeMode::Static(..) => {
add_standby_signal(pgdata_path)?;
remove_logrep_files(pgdata_path).context("remove_logrep_files")?;
}
}

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::fmt::Write;
use std::fs;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::os::unix::fs::PermissionsExt;
@@ -8,6 +7,7 @@ use std::path::Path;
use std::process::Child;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::{fs, io};
use anyhow::{bail, Result};
use ini::Ini;
@@ -366,6 +366,25 @@ pub fn create_pgdata(pgdata: &str) -> Result<()> {
Ok(())
}
/// Remove contents of the given directory. It must exist.
fn remove_dir_contents<P: AsRef<Path>>(path: P) -> io::Result<()> {
for entry in fs::read_dir(path)? {
fs::remove_file(entry?.path())?;
}
Ok(())
}
/// Logical replication slots and snapshot files are currently stored on
/// pageserver via logical replication messages, so standby can't write them. So
/// we remove them from the basebackup prepared for the standby. In particular
/// this removes noise from ls_monitor failing to drop them.
pub fn remove_logrep_files<P: AsRef<Path>>(pgdata: P) -> Result<()> {
remove_dir_contents(pgdata.as_ref().join("pg_replslot"))?;
remove_dir_contents(pgdata.as_ref().join("pg_logical/snapshots"))?;
remove_dir_contents(pgdata.as_ref().join("pg_logical/mappings"))?;
Ok(())
}
/// Update pgbouncer.ini with provided options
fn update_pgbouncer_ini(
pgbouncer_config: HashMap<String, String>,

View File

@@ -3983,27 +3983,18 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn:
"""Wait logical replication subscriber to sync with publisher."""
def is_synced(publisher_lsn):
# Even if pg_stat_subscription.latest_end_lsn is caughtup, some tables
# might not be synced because until sync worker finishes main apply
# continues to advance.
rels_synced = subscriber.safe_psql(
"select count(*) = 0 from pg_subscription_rel where srsubstate != 'r'"
)[0][0]
log.info(f"number of not synced rels: {rels_synced}")
assert rels_synced
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
while True:
res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][
0
]
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
assert subscriber_lsn >= publisher_lsn
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_until(30, 0.5, partial(is_synced, publisher_lsn))
return publisher_lsn
if res:
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
if subscriber_lsn >= publisher_lsn:
return subscriber_lsn
time.sleep(0.5)
def tenant_get_shards(