mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
Logical replication (#5271)
## Problem See https://github.com/neondatabase/company_projects/issues/111 ## Summary of changes Save logical replication files in WAL at compute and include them in basebackup at pate server. ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech> Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
committed by
GitHub
parent
607d19f0e0
commit
5c88213eaf
@@ -252,7 +252,7 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()>
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
|
||||
THEN
|
||||
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
IF array_length(roles, 1) IS NOT NULL THEN
|
||||
EXECUTE format('GRANT neon_superuser TO %s',
|
||||
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
|
||||
|
||||
@@ -28,7 +28,7 @@ mod pg_helpers_tests {
|
||||
assert_eq!(
|
||||
spec.cluster.settings.as_pg_settings(),
|
||||
r#"fsync = off
|
||||
wal_level = replica
|
||||
wal_level = logical
|
||||
hot_standby = on
|
||||
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
|
||||
wal_log_hints = on
|
||||
|
||||
@@ -253,7 +253,7 @@ impl Endpoint {
|
||||
conf.append("shared_buffers", "1MB");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "replica");
|
||||
conf.append("wal_level", "logical");
|
||||
// wal_sender_timeout is the maximum time to wait for WAL replication.
|
||||
// It also defines how often the walreciever will send a feedback message to the wal sender.
|
||||
conf.append("wal_sender_timeout", "5s");
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "replica",
|
||||
"value": "logical",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -76,7 +76,7 @@
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "replica",
|
||||
"value": "logical",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -220,6 +220,10 @@ pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
||||
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
||||
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
||||
|
||||
/* From replication/slot.h */
|
||||
pub const REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN: usize = 4*4 /* offset of `slotdata` in ReplicationSlotOnDisk */
|
||||
+ 64 /* NameData */ + 4*4;
|
||||
|
||||
/* From fsm_internals.h */
|
||||
const FSM_NODES_PER_PAGE: usize = BLCKSZ as usize - SIZEOF_PAGE_HEADER_DATA - 4;
|
||||
const FSM_NON_LEAF_NODES_PER_PAGE: usize = BLCKSZ as usize / 2 - 1;
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use fail::fail_point;
|
||||
use postgres_ffi::pg_constants;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::SystemTime;
|
||||
use tokio::io;
|
||||
@@ -180,6 +181,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
@@ -213,6 +215,34 @@ where
|
||||
self.add_rel(rel, rel).await?;
|
||||
}
|
||||
}
|
||||
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
info!(
|
||||
"Min restart LSN for logical replication is {}",
|
||||
min_restart_lsn
|
||||
);
|
||||
let data = min_restart_lsn.0.to_le_bytes();
|
||||
let header = new_tar_header("restart.lsn", data.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &data[..])
|
||||
.await
|
||||
.context("could not add restart.lsn file to basebackup tarball")?;
|
||||
}
|
||||
for xid in self
|
||||
.timeline
|
||||
|
||||
@@ -499,6 +499,23 @@ impl Timeline {
|
||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
pub async fn list_aux_files(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
|
||||
match self.get(AUX_FILES_KEY, lsn, ctx).await {
|
||||
Ok(buf) => match AuxFilesDirectory::des(&buf).context("deserialization failure") {
|
||||
Ok(dir) => Ok(dir.files),
|
||||
Err(e) => Err(PageReconstructError::from(e)),
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("Failed to get info about AUX files: {}", e);
|
||||
Ok(HashMap::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Does the same as get_current_logical_size but counted on demand.
|
||||
/// Used to initialize the logical size tracking on startup.
|
||||
///
|
||||
@@ -616,6 +633,7 @@ impl Timeline {
|
||||
|
||||
result.add_key(CONTROLFILE_KEY);
|
||||
result.add_key(CHECKPOINT_KEY);
|
||||
result.add_key(AUX_FILES_KEY);
|
||||
|
||||
Ok(result.to_keyspace())
|
||||
}
|
||||
@@ -692,6 +710,12 @@ impl<'a> DatadirModification<'a> {
|
||||
})?;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// Create AuxFilesDirectory
|
||||
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
})?;
|
||||
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
|
||||
|
||||
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
|
||||
xids: HashSet::new(),
|
||||
})?;
|
||||
@@ -796,6 +820,12 @@ impl<'a> DatadirModification<'a> {
|
||||
// 'true', now write the updated 'dbdirs' map back.
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// Create AuxFilesDirectory as well
|
||||
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
})?;
|
||||
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
|
||||
}
|
||||
if r.is_none() {
|
||||
// Create RelDirectory
|
||||
@@ -1120,6 +1150,36 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn put_file(
|
||||
&mut self,
|
||||
path: &str,
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(buf) => AuxFilesDirectory::des(&buf)?,
|
||||
Err(e) => {
|
||||
warn!("Failed to get info about AUX files: {}", e);
|
||||
AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
}
|
||||
}
|
||||
};
|
||||
let path = path.to_string();
|
||||
if content.is_empty() {
|
||||
dir.files.remove(&path);
|
||||
} else {
|
||||
dir.files.insert(path, Bytes::copy_from_slice(content));
|
||||
}
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Flush changes accumulated so far to the underlying repository.
|
||||
///
|
||||
@@ -1255,6 +1315,11 @@ struct RelDirectory {
|
||||
rels: HashSet<(Oid, u8)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
struct AuxFilesDirectory {
|
||||
files: HashMap<String, Bytes>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct RelSizeEntry {
|
||||
nblocks: u32,
|
||||
@@ -1303,10 +1368,12 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
// 02 pg_twophase
|
||||
//
|
||||
// 03 misc
|
||||
// controlfile
|
||||
// Controlfile
|
||||
// checkpoint
|
||||
// pg_version
|
||||
//
|
||||
// 04 aux files
|
||||
//
|
||||
// Below is a full list of the keyspace allocation:
|
||||
//
|
||||
// DbDir:
|
||||
@@ -1344,6 +1411,11 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
//
|
||||
// Checkpoint:
|
||||
// 03 00000000 00000000 00000000 00 00000001
|
||||
//
|
||||
// AuxFiles:
|
||||
// 03 00000000 00000000 00000000 00 00000002
|
||||
//
|
||||
|
||||
//-- Section 01: relation data and metadata
|
||||
|
||||
const DBDIR_KEY: Key = Key {
|
||||
@@ -1567,6 +1639,15 @@ const CHECKPOINT_KEY: Key = Key {
|
||||
field6: 1,
|
||||
};
|
||||
|
||||
const AUX_FILES_KEY: Key = Key {
|
||||
field1: 0x03,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 2,
|
||||
};
|
||||
|
||||
// Reverse mappings for a few Keys.
|
||||
// These are needed by WAL redo manager.
|
||||
|
||||
|
||||
@@ -338,11 +338,20 @@ impl<'a> WalIngest<'a> {
|
||||
} else if decoded.xl_rmid == pg_constants::RM_LOGICALMSG_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
|
||||
// This is a convenient way to make the WAL ingestion pause at
|
||||
// particular point in the WAL. For more fine-grained control,
|
||||
// we could peek into the message and only pause if it contains
|
||||
// a particular string, for example, but this is enough for now.
|
||||
crate::failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
|
||||
let xlrec = XlLogicalMessage::decode(&mut buf);
|
||||
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
|
||||
let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
|
||||
if prefix == "neon-test" {
|
||||
// This is a convenient way to make the WAL ingestion pause at
|
||||
// particular point in the WAL. For more fine-grained control,
|
||||
// we could peek into the message and only pause if it contains
|
||||
// a particular string, for example, but this is enough for now.
|
||||
crate::failpoint_support::sleep_millis_async!(
|
||||
"wal-ingest-logical-message-sleep"
|
||||
);
|
||||
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
|
||||
modification.put_file(path, message, ctx).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -459,7 +468,6 @@ impl<'a> WalIngest<'a> {
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v14::XlHeapDelete::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
@@ -527,7 +535,6 @@ impl<'a> WalIngest<'a> {
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v15::XlHeapDelete::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
@@ -595,7 +602,6 @@ impl<'a> WalIngest<'a> {
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = v16::XlHeapDelete::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
@@ -771,7 +777,6 @@ impl<'a> WalIngest<'a> {
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_DELETE => {
|
||||
let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
|
||||
assert_eq!(0, buf.remaining());
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
|
||||
@@ -748,6 +748,26 @@ impl XlMultiXactTruncate {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlLogicalMessage {
|
||||
pub db_id: Oid,
|
||||
pub transactional: bool,
|
||||
pub prefix_size: usize,
|
||||
pub message_size: usize,
|
||||
}
|
||||
|
||||
impl XlLogicalMessage {
|
||||
pub fn decode(buf: &mut Bytes) -> XlLogicalMessage {
|
||||
XlLogicalMessage {
|
||||
db_id: buf.get_u32_le(),
|
||||
transactional: buf.get_u32_le() != 0, // 4-bytes alignment
|
||||
prefix_size: buf.get_u64_le() as usize,
|
||||
message_size: buf.get_u64_le() as usize,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main routine to decode a WAL record and figure out which blocks are modified
|
||||
//
|
||||
// See xlogrecord.h for details
|
||||
|
||||
@@ -63,7 +63,6 @@
|
||||
#include "storage/md.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
#include "access/xlogrecovery.h"
|
||||
@@ -1395,12 +1394,6 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
|
||||
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
}
|
||||
else if (am_walsender)
|
||||
{
|
||||
*latest = true;
|
||||
lsn = InvalidXLogRecPtr;
|
||||
elog(DEBUG1, "am walsender neon_get_request_lsn lsn 0 ");
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr flushlsn;
|
||||
|
||||
@@ -861,8 +861,30 @@ RecvVoteResponse(Safekeeper *sk)
|
||||
static void
|
||||
HandleElectedProposer(WalProposer *wp)
|
||||
{
|
||||
FILE* f;
|
||||
XLogRecPtr lrRestartLsn;
|
||||
|
||||
DetermineEpochStartLsn(wp);
|
||||
|
||||
/*
|
||||
* If there are active logical replication subscription we need
|
||||
* to provide enough WAL for their WAL senders based on th position
|
||||
* of their replication slots.
|
||||
*/
|
||||
f = fopen("restart.lsn", "rb");
|
||||
if (f != NULL && !wp->config->syncSafekeepers)
|
||||
{
|
||||
fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
|
||||
fclose(f);
|
||||
if (lrRestartLsn != InvalidXLogRecPtr)
|
||||
{
|
||||
elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
|
||||
/* start from the beginning of the segment to fetch page headers verifed by XLogReader */
|
||||
lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size);
|
||||
wp->truncateLsn = Min(wp->truncateLsn, lrRestartLsn);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if not all safekeepers are up-to-date, we need to download WAL
|
||||
* needed to synchronize them
|
||||
|
||||
@@ -3119,6 +3119,22 @@ def check_restored_datadir_content(
|
||||
assert (mismatch, error) == ([], [])
|
||||
|
||||
|
||||
def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn:
|
||||
"""Wait logical replication subscriber to sync with publisher."""
|
||||
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
while True:
|
||||
res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][
|
||||
0
|
||||
]
|
||||
if res:
|
||||
log.info(f"subscriber_lsn={res}")
|
||||
subscriber_lsn = Lsn(res)
|
||||
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
|
||||
if subscriber_lsn >= publisher_lsn:
|
||||
return subscriber_lsn
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def wait_for_last_flush_lsn(
|
||||
env: NeonEnv,
|
||||
endpoint: Endpoint,
|
||||
|
||||
43
test_runner/performance/test_logical_replication.py
Normal file
43
test_runner/performance/test_logical_replication.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, logical_replication_sync
|
||||
|
||||
|
||||
@pytest.mark.timeout(1000)
|
||||
def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start("test_logical_replication")
|
||||
|
||||
log.info("postgres is running on 'test_logical_replication' branch")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
|
||||
|
||||
endpoint.safe_psql("create publication pub1 for table pgbench_accounts, pgbench_history")
|
||||
|
||||
# now start subscriber
|
||||
vanilla_pg.start()
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s10", vanilla_pg.connstr()])
|
||||
|
||||
vanilla_pg.safe_psql("truncate table pgbench_accounts")
|
||||
vanilla_pg.safe_psql("truncate table pgbench_history")
|
||||
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
print(f"connstr='{connstr}'")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
# Wait logical replication channel to be established
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-c10", "-T100", "-Mprepared", endpoint.connstr()])
|
||||
|
||||
# Wait logical replication to sync
|
||||
start = time.time()
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
log.info(f"Sync with master took {time.time() - start} seconds")
|
||||
|
||||
sum_master = endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]
|
||||
sum_replica = vanilla_pg.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]
|
||||
assert sum_master == sum_replica
|
||||
149
test_runner/regress/test_logical_replication.py
Normal file
149
test_runner/regress/test_logical_replication.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
logical_replication_sync,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
|
||||
|
||||
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
)
|
||||
|
||||
log.info("postgres is running on 'test_logical_replication' branch")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
cur.execute(
|
||||
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));"
|
||||
)
|
||||
cur.execute("create publication pub1 for table t, replication_example")
|
||||
|
||||
# now start subscriber
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
|
||||
vanilla_pg.safe_psql(
|
||||
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);"
|
||||
)
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
# Wait logical replication channel to be established
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
|
||||
# insert some data
|
||||
cur.execute("insert into t values (generate_series(1,1000), 0)")
|
||||
|
||||
# Wait logical replication to sync
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 1000
|
||||
|
||||
# now stop subscriber...
|
||||
vanilla_pg.stop()
|
||||
|
||||
# ... and insert some more data which should be delivered to subscriber after restart
|
||||
cur.execute("insert into t values (generate_series(1001,2000), 0)")
|
||||
|
||||
# Restart compute
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
# start subscriber
|
||||
vanilla_pg.start()
|
||||
|
||||
# Wait logical replication to sync
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
|
||||
# Check that subscribers receives all data
|
||||
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == 2000
|
||||
|
||||
# Test that save/restore of RewriteMappingFile works. Partial copy of
|
||||
# rewrite.sql test.
|
||||
log.info("checking rewriteheap")
|
||||
vanilla_pg.stop()
|
||||
cmds = """
|
||||
INSERT INTO replication_example(somedata) VALUES (1);
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO replication_example(somedata) VALUES (2);
|
||||
ALTER TABLE replication_example ADD COLUMN testcolumn1 int;
|
||||
INSERT INTO replication_example(somedata, testcolumn1) VALUES (3, 1);
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO replication_example(somedata) VALUES (3);
|
||||
ALTER TABLE replication_example ADD COLUMN testcolumn2 int;
|
||||
INSERT INTO replication_example(somedata, testcolumn1, testcolumn2) VALUES (4, 2, 1);
|
||||
COMMIT;
|
||||
|
||||
VACUUM FULL pg_am;
|
||||
VACUUM FULL pg_amop;
|
||||
VACUUM FULL pg_proc;
|
||||
VACUUM FULL pg_opclass;
|
||||
VACUUM FULL pg_type;
|
||||
VACUUM FULL pg_index;
|
||||
VACUUM FULL pg_database;
|
||||
|
||||
|
||||
-- repeated rewrites that fail
|
||||
BEGIN;
|
||||
CLUSTER pg_class USING pg_class_oid_index;
|
||||
CLUSTER pg_class USING pg_class_oid_index;
|
||||
ROLLBACK;
|
||||
|
||||
-- repeated rewrites that succeed
|
||||
BEGIN;
|
||||
CLUSTER pg_class USING pg_class_oid_index;
|
||||
CLUSTER pg_class USING pg_class_oid_index;
|
||||
CLUSTER pg_class USING pg_class_oid_index;
|
||||
COMMIT;
|
||||
|
||||
-- repeated rewrites in different transactions
|
||||
VACUUM FULL pg_class;
|
||||
VACUUM FULL pg_class;
|
||||
|
||||
-- reindexing of important relations / indexes
|
||||
REINDEX TABLE pg_class;
|
||||
REINDEX INDEX pg_class_oid_index;
|
||||
REINDEX INDEX pg_class_tblspc_relfilenode_index;
|
||||
|
||||
INSERT INTO replication_example(somedata, testcolumn1) VALUES (5, 3);
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO replication_example(somedata, testcolumn1) VALUES (6, 4);
|
||||
ALTER TABLE replication_example ADD COLUMN testcolumn3 int;
|
||||
INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (7, 5, 1);
|
||||
COMMIT;
|
||||
"""
|
||||
endpoint.safe_psql_many([q for q in cmds.splitlines() if q != "" and not q.startswith("-")])
|
||||
|
||||
# refetch rewrite files from pageserver
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
vanilla_pg.start()
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
eq_q = "select testcolumn1, testcolumn2, testcolumn3 from replication_example order by 1, 2, 3"
|
||||
assert vanilla_pg.safe_psql(eq_q) == endpoint.safe_psql(eq_q)
|
||||
log.info("rewriteheap synced")
|
||||
|
||||
# test that removal of repl slots works across restart
|
||||
vanilla_pg.stop()
|
||||
time.sleep(1) # wait for conn termination; active slots can't be dropped
|
||||
endpoint.safe_psql("select pg_drop_replication_slot('sub1');")
|
||||
endpoint.safe_psql("insert into t values (2001, 1);") # forces WAL flush
|
||||
# wait for drop message to reach safekeepers (it is not transactional)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
# it must be gone (but walproposer slot still exists, hence 1)
|
||||
assert endpoint.safe_psql("select count(*) from pg_replication_slots")[0][0] == 1
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 5d5cfee127...ebcca9e9eb
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 74cfe3e681...23f2d41102
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 389ce36b4b...e5e255d2da
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "389ce36b4b3da7aa654a25e1b3f10b641319a87f",
|
||||
"postgres-v15": "74cfe3e681836747a31fdbd47bdd14b3d81b0772",
|
||||
"postgres-v14": "5d5cfee12783f0989a9c9fe13bb40b5585812568"
|
||||
"postgres-v16": "e5e255d2da05bc5f884b871c042014030a114a9b",
|
||||
"postgres-v15": "23f2d411020a739375b32895ce1362ded2962084",
|
||||
"postgres-v14": "ebcca9e9eb49621b5b17247833b59e836337e8aa"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user