diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index e67430d061..9fd88e5818 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -252,7 +252,7 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> IF NOT EXISTS ( SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser') THEN - CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data; + CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION IN ROLE pg_read_all_data, pg_write_all_data; IF array_length(roles, 1) IS NOT NULL THEN EXECUTE format('GRANT neon_superuser TO %s', array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', ')); diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index 7d27d22a78..4961bc293d 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -28,7 +28,7 @@ mod pg_helpers_tests { assert_eq!( spec.cluster.settings.as_pg_settings(), r#"fsync = off -wal_level = replica +wal_level = logical hot_standby = on neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501' wal_log_hints = on diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index cba364c049..acd9061664 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -253,7 +253,7 @@ impl Endpoint { conf.append("shared_buffers", "1MB"); conf.append("fsync", "off"); conf.append("max_connections", "100"); - conf.append("wal_level", "replica"); + conf.append("wal_level", "logical"); // wal_sender_timeout is the maximum time to wait for WAL replication. // It also defines how often the walreciever will send a feedback message to the wal sender. conf.append("wal_sender_timeout", "5s"); diff --git a/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json b/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json index 565e5e368e..ccf0a91b90 100644 --- a/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json +++ b/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json @@ -25,7 +25,7 @@ }, { "name": "wal_level", - "value": "replica", + "value": "logical", "vartype": "enum" }, { diff --git a/libs/compute_api/tests/cluster_spec.json b/libs/compute_api/tests/cluster_spec.json index 96db13a5da..e2afa17ef0 100644 --- a/libs/compute_api/tests/cluster_spec.json +++ b/libs/compute_api/tests/cluster_spec.json @@ -76,7 +76,7 @@ }, { "name": "wal_level", - "value": "replica", + "value": "logical", "vartype": "enum" }, { diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 9690dc0eb6..d59e0e4a15 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -220,6 +220,10 @@ pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; pub const XLP_LONG_HEADER: u16 = 0x0002; +/* From replication/slot.h */ +pub const REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN: usize = 4*4 /* offset of `slotdata` in ReplicationSlotOnDisk */ + + 64 /* NameData */ + 4*4; + /* From fsm_internals.h */ const FSM_NODES_PER_PAGE: usize = BLCKSZ as usize - SIZEOF_PAGE_HEADER_DATA - 4; const FSM_NON_LEAF_NODES_PER_PAGE: usize = BLCKSZ as usize / 2 - 1; diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index a959f1cddc..ed452eae7d 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -13,6 +13,7 @@ use anyhow::{anyhow, bail, ensure, Context}; use bytes::{BufMut, BytesMut}; use fail::fail_point; +use postgres_ffi::pg_constants; use std::fmt::Write as FmtWrite; use std::time::SystemTime; use tokio::io; @@ -180,6 +181,7 @@ where } } + let mut min_restart_lsn: Lsn = Lsn::MAX; // Create tablespace directories for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn, self.ctx).await? @@ -213,6 +215,34 @@ where self.add_rel(rel, rel).await?; } } + + for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? { + if path.starts_with("pg_replslot") { + let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; + let restart_lsn = Lsn(u64::from_le_bytes( + content[offs..offs + 8].try_into().unwrap(), + )); + info!("Replication slot {} restart LSN={}", path, restart_lsn); + min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + } + let header = new_tar_header(&path, content.len() as u64)?; + self.ar + .append(&header, &*content) + .await + .context("could not add aux file to basebackup tarball")?; + } + } + if min_restart_lsn != Lsn::MAX { + info!( + "Min restart LSN for logical replication is {}", + min_restart_lsn + ); + let data = min_restart_lsn.0.to_le_bytes(); + let header = new_tar_header("restart.lsn", data.len() as u64)?; + self.ar + .append(&header, &data[..]) + .await + .context("could not add restart.lsn file to basebackup tarball")?; } for xid in self .timeline diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 9a1281a522..ebba3d8579 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -499,6 +499,23 @@ impl Timeline { self.get(CHECKPOINT_KEY, lsn, ctx).await } + pub async fn list_aux_files( + &self, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + match self.get(AUX_FILES_KEY, lsn, ctx).await { + Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") { + Ok(dir) => Ok(dir.files), + Err(e) => Err(PageReconstructError::from(e)), + }, + Err(e) => { + warn!("Failed to get info about AUX files: {}", e); + Ok(HashMap::new()) + } + } + } + /// Does the same as get_current_logical_size but counted on demand. /// Used to initialize the logical size tracking on startup. /// @@ -616,6 +633,7 @@ impl Timeline { result.add_key(CONTROLFILE_KEY); result.add_key(CHECKPOINT_KEY); + result.add_key(AUX_FILES_KEY); Ok(result.to_keyspace()) } @@ -692,6 +710,12 @@ impl<'a> DatadirModification<'a> { })?; self.put(DBDIR_KEY, Value::Image(buf.into())); + // Create AuxFilesDirectory + let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { + files: HashMap::new(), + })?; + self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf))); + let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory { xids: HashSet::new(), })?; @@ -796,6 +820,12 @@ impl<'a> DatadirModification<'a> { // 'true', now write the updated 'dbdirs' map back. let buf = DbDirectory::ser(&dbdir)?; self.put(DBDIR_KEY, Value::Image(buf.into())); + + // Create AuxFilesDirectory as well + let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { + files: HashMap::new(), + })?; + self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf))); } if r.is_none() { // Create RelDirectory @@ -1120,6 +1150,36 @@ impl<'a> DatadirModification<'a> { Ok(()) } + pub async fn put_file( + &mut self, + path: &str, + content: &[u8], + ctx: &RequestContext, + ) -> anyhow::Result<()> { + let mut dir = match self.get(AUX_FILES_KEY, ctx).await { + Ok(buf) => AuxFilesDirectory::des(&buf)?, + Err(e) => { + warn!("Failed to get info about AUX files: {}", e); + AuxFilesDirectory { + files: HashMap::new(), + } + } + }; + let path = path.to_string(); + if content.is_empty() { + dir.files.remove(&path); + } else { + dir.files.insert(path, Bytes::copy_from_slice(content)); + } + self.put( + AUX_FILES_KEY, + Value::Image(Bytes::from( + AuxFilesDirectory::ser(&dir).context("serialize")?, + )), + ); + Ok(()) + } + /// /// Flush changes accumulated so far to the underlying repository. /// @@ -1255,6 +1315,11 @@ struct RelDirectory { rels: HashSet<(Oid, u8)>, } +#[derive(Debug, Serialize, Deserialize, Default)] +struct AuxFilesDirectory { + files: HashMap, +} + #[derive(Debug, Serialize, Deserialize)] struct RelSizeEntry { nblocks: u32, @@ -1303,10 +1368,12 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // 02 pg_twophase // // 03 misc -// controlfile +// Controlfile // checkpoint // pg_version // +// 04 aux files +// // Below is a full list of the keyspace allocation: // // DbDir: @@ -1344,6 +1411,11 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // // Checkpoint: // 03 00000000 00000000 00000000 00 00000001 +// +// AuxFiles: +// 03 00000000 00000000 00000000 00 00000002 +// + //-- Section 01: relation data and metadata const DBDIR_KEY: Key = Key { @@ -1567,6 +1639,15 @@ const CHECKPOINT_KEY: Key = Key { field6: 1, }; +const AUX_FILES_KEY: Key = Key { + field1: 0x03, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 2, +}; + // Reverse mappings for a few Keys. // These are needed by WAL redo manager. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index d290715938..fb1dbcd6ba 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -338,11 +338,20 @@ impl<'a> WalIngest<'a> { } else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_LOGICAL_MESSAGE { - // This is a convenient way to make the WAL ingestion pause at - // particular point in the WAL. For more fine-grained control, - // we could peek into the message and only pause if it contains - // a particular string, for example, but this is enough for now. - crate::failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); + let xlrec = XlLogicalMessage::decode(&mut buf); + let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; + let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size]; + if prefix == "neon-test" { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + crate::failpoint_support::sleep_millis_async!( + "wal-ingest-logical-message-sleep" + ); + } else if let Some(path) = prefix.strip_prefix("neon-file:") { + modification.put_file(path, message, ctx).await?; + } } } @@ -459,7 +468,6 @@ impl<'a> WalIngest<'a> { } } else if info == pg_constants::XLOG_HEAP_DELETE { let xlrec = v14::XlHeapDelete::decode(buf); - assert_eq!(0, buf.remaining()); if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { new_heap_blkno = Some(decoded.blocks[0].blkno); } @@ -527,7 +535,6 @@ impl<'a> WalIngest<'a> { } } else if info == pg_constants::XLOG_HEAP_DELETE { let xlrec = v15::XlHeapDelete::decode(buf); - assert_eq!(0, buf.remaining()); if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { new_heap_blkno = Some(decoded.blocks[0].blkno); } @@ -595,7 +602,6 @@ impl<'a> WalIngest<'a> { } } else if info == pg_constants::XLOG_HEAP_DELETE { let xlrec = v16::XlHeapDelete::decode(buf); - assert_eq!(0, buf.remaining()); if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { new_heap_blkno = Some(decoded.blocks[0].blkno); } @@ -771,7 +777,6 @@ impl<'a> WalIngest<'a> { } pg_constants::XLOG_NEON_HEAP_DELETE => { let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf); - assert_eq!(0, buf.remaining()); if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { new_heap_blkno = Some(decoded.blocks[0].blkno); } diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 9c2e522f17..ff6bc9194b 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -748,6 +748,26 @@ impl XlMultiXactTruncate { } } +#[repr(C)] +#[derive(Debug)] +pub struct XlLogicalMessage { + pub db_id: Oid, + pub transactional: bool, + pub prefix_size: usize, + pub message_size: usize, +} + +impl XlLogicalMessage { + pub fn decode(buf: &mut Bytes) -> XlLogicalMessage { + XlLogicalMessage { + db_id: buf.get_u32_le(), + transactional: buf.get_u32_le() != 0, // 4-bytes alignment + prefix_size: buf.get_u64_le() as usize, + message_size: buf.get_u64_le() as usize, + } + } +} + /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 5e172a0be4..3a841b04ec 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -63,7 +63,6 @@ #include "storage/md.h" #include "pgstat.h" - #if PG_VERSION_NUM >= 150000 #include "access/xlogutils.h" #include "access/xlogrecovery.h" @@ -1395,12 +1394,6 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ", (uint32) ((lsn) >> 32), (uint32) (lsn)); } - else if (am_walsender) - { - *latest = true; - lsn = InvalidXLogRecPtr; - elog(DEBUG1, "am walsender neon_get_request_lsn lsn 0 "); - } else { XLogRecPtr flushlsn; diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index c1fd5e3ef3..10612e7e35 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -861,8 +861,30 @@ RecvVoteResponse(Safekeeper *sk) static void HandleElectedProposer(WalProposer *wp) { + FILE* f; + XLogRecPtr lrRestartLsn; + DetermineEpochStartLsn(wp); + /* + * If there are active logical replication subscription we need + * to provide enough WAL for their WAL senders based on th position + * of their replication slots. + */ + f = fopen("restart.lsn", "rb"); + if (f != NULL && !wp->config->syncSafekeepers) + { + fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f); + fclose(f); + if (lrRestartLsn != InvalidXLogRecPtr) + { + elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); + /* start from the beginning of the segment to fetch page headers verifed by XLogReader */ + lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size); + wp->truncateLsn = Min(wp->truncateLsn, lrRestartLsn); + } + } + /* * Check if not all safekeepers are up-to-date, we need to download WAL * needed to synchronize them diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d9a75637b8..68e29523b0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3119,6 +3119,22 @@ def check_restored_datadir_content( assert (mismatch, error) == ([], []) +def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn: + """Wait logical replication subscriber to sync with publisher.""" + publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + while True: + res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][ + 0 + ] + if res: + log.info(f"subscriber_lsn={res}") + subscriber_lsn = Lsn(res) + log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}") + if subscriber_lsn >= publisher_lsn: + return subscriber_lsn + time.sleep(0.5) + + def wait_for_last_flush_lsn( env: NeonEnv, endpoint: Endpoint, diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py new file mode 100644 index 0000000000..b799f7248f --- /dev/null +++ b/test_runner/performance/test_logical_replication.py @@ -0,0 +1,43 @@ +import time + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, PgBin, logical_replication_sync + + +@pytest.mark.timeout(1000) +def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg): + env = neon_simple_env + + env.neon_cli.create_branch("test_logical_replication", "empty") + endpoint = env.endpoints.create_start("test_logical_replication") + + log.info("postgres is running on 'test_logical_replication' branch") + pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()]) + + endpoint.safe_psql("create publication pub1 for table pgbench_accounts, pgbench_history") + + # now start subscriber + vanilla_pg.start() + pg_bin.run_capture(["pgbench", "-i", "-s10", vanilla_pg.connstr()]) + + vanilla_pg.safe_psql("truncate table pgbench_accounts") + vanilla_pg.safe_psql("truncate table pgbench_history") + + connstr = endpoint.connstr().replace("'", "''") + print(f"connstr='{connstr}'") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + # Wait logical replication channel to be established + logical_replication_sync(vanilla_pg, endpoint) + + pg_bin.run_capture(["pgbench", "-c10", "-T100", "-Mprepared", endpoint.connstr()]) + + # Wait logical replication to sync + start = time.time() + logical_replication_sync(vanilla_pg, endpoint) + log.info(f"Sync with master took {time.time() - start} seconds") + + sum_master = endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0] + sum_replica = vanilla_pg.safe_psql("select sum(abalance) from pgbench_accounts")[0][0] + assert sum_master == sum_replica diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py new file mode 100644 index 0000000000..726e5e5def --- /dev/null +++ b/test_runner/regress/test_logical_replication.py @@ -0,0 +1,149 @@ +import time + +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + logical_replication_sync, + wait_for_last_flush_lsn, +) + + +def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg): + env = neon_simple_env + + tenant_id = env.initial_tenant + timeline_id = env.neon_cli.create_branch("test_logical_replication", "empty") + endpoint = env.endpoints.create_start( + "test_logical_replication", config_lines=["log_statement=all"] + ) + + log.info("postgres is running on 'test_logical_replication' branch") + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + cur.execute("create table t(pk integer primary key, payload integer)") + cur.execute( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));" + ) + cur.execute("create publication pub1 for table t, replication_example") + + # now start subscriber + vanilla_pg.start() + vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)") + vanilla_pg.safe_psql( + "CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);" + ) + connstr = endpoint.connstr().replace("'", "''") + log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}") + vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1") + + # Wait logical replication channel to be established + logical_replication_sync(vanilla_pg, endpoint) + + # insert some data + cur.execute("insert into t values (generate_series(1,1000), 0)") + + # Wait logical replication to sync + logical_replication_sync(vanilla_pg, endpoint) + assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 1000 + + # now stop subscriber... + vanilla_pg.stop() + + # ... and insert some more data which should be delivered to subscriber after restart + cur.execute("insert into t values (generate_series(1001,2000), 0)") + + # Restart compute + endpoint.stop() + endpoint.start() + + # start subscriber + vanilla_pg.start() + + # Wait logical replication to sync + logical_replication_sync(vanilla_pg, endpoint) + + # Check that subscribers receives all data + assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 2000 + + # Test that save/restore of RewriteMappingFile works. Partial copy of + # rewrite.sql test. + log.info("checking rewriteheap") + vanilla_pg.stop() + cmds = """ +INSERT INTO replication_example(somedata) VALUES (1); + +BEGIN; +INSERT INTO replication_example(somedata) VALUES (2); +ALTER TABLE replication_example ADD COLUMN testcolumn1 int; +INSERT INTO replication_example(somedata, testcolumn1) VALUES (3, 1); +COMMIT; + +BEGIN; +INSERT INTO replication_example(somedata) VALUES (3); +ALTER TABLE replication_example ADD COLUMN testcolumn2 int; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn2) VALUES (4, 2, 1); +COMMIT; + +VACUUM FULL pg_am; +VACUUM FULL pg_amop; +VACUUM FULL pg_proc; +VACUUM FULL pg_opclass; +VACUUM FULL pg_type; +VACUUM FULL pg_index; +VACUUM FULL pg_database; + + +-- repeated rewrites that fail +BEGIN; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +ROLLBACK; + +-- repeated rewrites that succeed +BEGIN; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +CLUSTER pg_class USING pg_class_oid_index; +COMMIT; + +-- repeated rewrites in different transactions +VACUUM FULL pg_class; +VACUUM FULL pg_class; + +-- reindexing of important relations / indexes +REINDEX TABLE pg_class; +REINDEX INDEX pg_class_oid_index; +REINDEX INDEX pg_class_tblspc_relfilenode_index; + +INSERT INTO replication_example(somedata, testcolumn1) VALUES (5, 3); + +BEGIN; +INSERT INTO replication_example(somedata, testcolumn1) VALUES (6, 4); +ALTER TABLE replication_example ADD COLUMN testcolumn3 int; +INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (7, 5, 1); +COMMIT; +""" + endpoint.safe_psql_many([q for q in cmds.splitlines() if q != "" and not q.startswith("-")]) + + # refetch rewrite files from pageserver + endpoint.stop() + endpoint.start() + + vanilla_pg.start() + logical_replication_sync(vanilla_pg, endpoint) + eq_q = "select testcolumn1, testcolumn2, testcolumn3 from replication_example order by 1, 2, 3" + assert vanilla_pg.safe_psql(eq_q) == endpoint.safe_psql(eq_q) + log.info("rewriteheap synced") + + # test that removal of repl slots works across restart + vanilla_pg.stop() + time.sleep(1) # wait for conn termination; active slots can't be dropped + endpoint.safe_psql("select pg_drop_replication_slot('sub1');") + endpoint.safe_psql("insert into t values (2001, 1);") # forces WAL flush + # wait for drop message to reach safekeepers (it is not transactional) + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + endpoint.stop() + endpoint.start() + # it must be gone (but walproposer slot still exists, hence 1) + assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1 diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 5d5cfee127..ebcca9e9eb 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 5d5cfee12783f0989a9c9fe13bb40b5585812568 +Subproject commit ebcca9e9eb49621b5b17247833b59e836337e8aa diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 74cfe3e681..23f2d41102 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 74cfe3e681836747a31fdbd47bdd14b3d81b0772 +Subproject commit 23f2d411020a739375b32895ce1362ded2962084 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 389ce36b4b..e5e255d2da 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 389ce36b4b3da7aa654a25e1b3f10b641319a87f +Subproject commit e5e255d2da05bc5f884b871c042014030a114a9b diff --git a/vendor/revisions.json b/vendor/revisions.json index d08cb25f43..14ccbef287 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "389ce36b4b3da7aa654a25e1b3f10b641319a87f", - "postgres-v15": "74cfe3e681836747a31fdbd47bdd14b3d81b0772", - "postgres-v14": "5d5cfee12783f0989a9c9fe13bb40b5585812568" + "postgres-v16": "e5e255d2da05bc5f884b871c042014030a114a9b", + "postgres-v15": "23f2d411020a739375b32895ce1362ded2962084", + "postgres-v14": "ebcca9e9eb49621b5b17247833b59e836337e8aa" }