From 7006caf3a1480567e911169b4f9488ac2a81d699 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 3 Jun 2024 19:37:33 +0300 Subject: [PATCH] Store logical replication origin in KV storage (#7099) Store logical replication origin in KV storage ## Problem See #6977 ## Summary of changes * Extract origin_lsn from commit WAl record * Add ReplOrigin key to KV storage and store origin_lsn * In basebackup replace snapshot origin_lsn with last committed origin_lsn at basebackup LSN ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --------- Signed-off-by: Alex Chi Z Co-authored-by: Konstantin Knizhnik Co-authored-by: Alex Chi Z --- libs/pageserver_api/src/key.rs | 35 ++++++++++++ libs/postgres_ffi/build.rs | 1 + libs/postgres_ffi/src/lib.rs | 1 + libs/postgres_ffi/src/pg_constants.rs | 10 +++- pageserver/src/basebackup.rs | 33 +++++++++++ pageserver/src/pgdatadir_mapping.rs | 47 +++++++++++++-- pageserver/src/tenant/timeline.rs | 17 +++--- pageserver/src/walingest.rs | 20 +++++++ pageserver/src/walrecord.rs | 46 ++++++++++++++- test_runner/regress/test_compaction.py | 6 +- .../regress/test_subscriber_restart.py | 57 +++++++++++++++++++ 11 files changed, 255 insertions(+), 18 deletions(-) create mode 100644 test_runner/regress/test_subscriber_restart.py diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 27fab5e7a0..997c1cc43a 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -1,6 +1,7 @@ use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; +use postgres_ffi::RepOriginId; use postgres_ffi::{Oid, TransactionId}; use serde::{Deserialize, Serialize}; use std::{fmt, ops::Range}; @@ -38,6 +39,9 @@ pub const RELATION_SIZE_PREFIX: u8 = 0x61; /// The key prefix of AUX file keys. pub const AUX_KEY_PREFIX: u8 = 0x62; +/// The key prefix of ReplOrigin keys. +pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63; + /// Check if the key falls in the range of metadata keys. pub const fn is_metadata_key_slice(key: &[u8]) -> bool { key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX @@ -587,6 +591,37 @@ pub const AUX_FILES_KEY: Key = Key { field6: 2, }; +#[inline(always)] +pub fn repl_origin_key(origin_id: RepOriginId) -> Key { + Key { + field1: REPL_ORIGIN_KEY_PREFIX, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: origin_id as u32, + } +} + +/// Get the range of replorigin keys. +pub fn repl_origin_key_range() -> Range { + Key { + field1: REPL_ORIGIN_KEY_PREFIX, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0, + }..Key { + field1: REPL_ORIGIN_KEY_PREFIX, + field2: 0, + field3: 0, + field4: 0, + field5: 0, + field6: 0x10000, + } +} + // Reverse mappings for a few Keys. // These are needed by WAL redo manager. diff --git a/libs/postgres_ffi/build.rs b/libs/postgres_ffi/build.rs index 8e6761d6d3..370d9e9a6f 100644 --- a/libs/postgres_ffi/build.rs +++ b/libs/postgres_ffi/build.rs @@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> { .allowlist_type("PageHeaderData") .allowlist_type("DBState") .allowlist_type("RelMapFile") + .allowlist_type("RepOriginId") // Because structs are used for serialization, tell bindgen to emit // explicit padding fields. .explicit_padding(true) diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 0d6986778a..729f57f829 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -110,6 +110,7 @@ pub mod pg_constants; pub mod relfile_utils; // Export some widely used datatypes that are unlikely to change across Postgres versions +pub use v14::bindings::RepOriginId; pub use v14::bindings::{uint32, uint64, Oid}; pub use v14::bindings::{BlockNumber, OffsetNumber}; pub use v14::bindings::{MultiXactId, TransactionId}; diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 2701ddf5e0..54b032d138 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -102,7 +102,7 @@ pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1; pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2; pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3; pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; -// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; +pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; // pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; @@ -167,6 +167,7 @@ pub const RM_RELMAP_ID: u8 = 7; pub const RM_STANDBY_ID: u8 = 8; pub const RM_HEAP2_ID: u8 = 9; pub const RM_HEAP_ID: u8 = 10; +pub const RM_REPLORIGIN_ID: u8 = 19; pub const RM_LOGICALMSG_ID: u8 = 21; // from neon_rmgr.h @@ -223,6 +224,10 @@ pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; pub const XLP_LONG_HEADER: u16 = 0x0002; +/* From xlog.h */ +pub const XLOG_REPLORIGIN_SET: u8 = 0x00; +pub const XLOG_REPLORIGIN_DROP: u8 = 0x10; + /* From replication/slot.h */ pub const REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN: usize = 4*4 /* offset of `slotdata` in ReplicationSlotOnDisk */ + 64 /* NameData */ + 4*4; @@ -237,6 +242,9 @@ pub const SLOTS_PER_FSM_PAGE: u32 = FSM_LEAF_NODES_PER_PAGE as u32; pub const VM_HEAPBLOCKS_PER_PAGE: u32 = (BLCKSZ as usize - SIZEOF_PAGE_HEADER_DATA) as u32 * (8 / 2); // MAPSIZE * (BITS_PER_BYTE / BITS_PER_HEAPBLOCK) +/* From origin.c */ +pub const REPLICATION_STATE_MAGIC: u32 = 0x1257DADE; + // List of subdirectories inside pgdata. // Copied from src/bin/initdb/initdb.c pub const PGDATA_SUBDIRS: [&str; 22] = [ diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 31518f5632..0f057a4368 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -362,6 +362,13 @@ where )); info!("Replication slot {} restart LSN={}", path, restart_lsn); min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + } else if path == "pg_logical/replorigin_checkpoint" { + // replorigin_checkoint is written only on compute shutdown, so it contains + // deteriorated values. So we generate our own version of this file for the particular LSN + // based on information about replorigins extracted from transaction commit records. + // In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all, + // but now we should handle (skip) it for backward compatibility. + continue; } let header = new_tar_header(&path, content.len() as u64)?; self.ar @@ -390,6 +397,32 @@ where { self.add_twophase_file(xid).await?; } + let repl_origins = self + .timeline + .get_replorigins(self.lsn, self.ctx) + .await + .map_err(|e| BasebackupError::Server(e.into()))?; + let n_origins = repl_origins.len(); + if n_origins != 0 { + // + // Construct "pg_logical/replorigin_checkpoint" file based on information about replication origins + // extracted from transaction commit record. We are using this file to pass information about replication + // origins to compute to allow logical replication to restart from proper point. + // + let mut content = Vec::with_capacity(n_origins * 16 + 8); + content.extend_from_slice(&pg_constants::REPLICATION_STATE_MAGIC.to_le_bytes()); + for (origin_id, origin_lsn) in repl_origins { + content.extend_from_slice(&origin_id.to_le_bytes()); + content.extend_from_slice(&[0u8; 6]); // align to 8 bytes + content.extend_from_slice(&origin_lsn.0.to_le_bytes()); + } + let crc32 = crc32c::crc32c(&content); + content.extend_from_slice(&crc32.to_le_bytes()); + let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?; + self.ar.append(&header, &*content).await.context( + "could not add pg_logical/replorigin_checkpoint file to basebackup tarball", + )?; + } fail_point!("basebackup-before-control-file", |_| { Err(BasebackupError::Server(anyhow!( diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 764c528a9e..5eaf80bdaf 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -18,16 +18,16 @@ use enum_map::Enum; use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, - relmap_file_key, slru_block_to_key, slru_dir_to_key, slru_segment_key_range, - slru_segment_size_to_key, twophase_file_key, twophase_key_range, AUX_FILES_KEY, CHECKPOINT_KEY, - CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, + relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, + slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range, + AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY, }; use pageserver_api::keyspace::SparseKeySpace; use pageserver_api::models::AuxFilePolicy; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; -use postgres_ffi::{Oid, TimestampTz, TransactionId}; +use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{hash_map, HashMap, HashSet}; use std::ops::ControlFlow; @@ -760,6 +760,27 @@ impl Timeline { } } + pub(crate) async fn get_replorigins( + &self, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + let kv = self + .scan(KeySpace::single(repl_origin_key_range()), lsn, ctx) + .await + .context("scan")?; + let mut result = HashMap::new(); + for (k, v) in kv { + let v = v.context("get value")?; + let origin_id = k.field6 as RepOriginId; + let origin_lsn = Lsn::des(&v).unwrap(); + if origin_lsn != Lsn::INVALID { + result.insert(origin_id, origin_lsn); + } + } + Ok(result) + } + /// Does the same as get_current_logical_size but counted on demand. /// Used to initialize the logical size tracking on startup. /// @@ -885,7 +906,9 @@ impl Timeline { Ok(( result.to_keyspace(), /* AUX sparse key space */ - SparseKeySpace(KeySpace::single(Key::metadata_aux_key_range())), + SparseKeySpace(KeySpace { + ranges: vec![repl_origin_key_range(), Key::metadata_aux_key_range()], + }), )) } @@ -1154,6 +1177,20 @@ impl<'a> DatadirModification<'a> { Ok(()) } + pub async fn set_replorigin( + &mut self, + origin_id: RepOriginId, + origin_lsn: Lsn, + ) -> anyhow::Result<()> { + let key = repl_origin_key(origin_id); + self.put(key, Value::Image(origin_lsn.ser().unwrap().into())); + Ok(()) + } + + pub async fn drop_replorigin(&mut self, origin_id: RepOriginId) -> anyhow::Result<()> { + self.set_replorigin(origin_id, Lsn::INVALID).await + } + pub fn put_control_file(&mut self, img: Bytes) -> anyhow::Result<()> { self.put(CONTROLFILE_KEY, Value::Image(img)); Ok(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5402c776e3..35e6d1f92f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3879,22 +3879,25 @@ impl Timeline { return Err(FlushLayerError::Cancelled); } + // FIXME(auxfilesv2): support multiple metadata key partitions might need initdb support as well? + // This code path will not be hit during regression tests. After #7099 we have a single partition + // with two key ranges. If someone wants to fix initdb optimization in the future, this might need + // to be fixed. + // For metadata, always create delta layers. let delta_layer = if !metadata_partition.parts.is_empty() { assert_eq!( metadata_partition.parts.len(), 1, - "currently sparse keyspace should only contain a single aux file keyspace" + "currently sparse keyspace should only contain a single metadata keyspace" ); let metadata_keyspace = &metadata_partition.parts[0]; - assert_eq!( - metadata_keyspace.0.ranges.len(), - 1, - "aux file keyspace should be a single range" - ); self.create_delta_layer( &frozen_layer, - Some(metadata_keyspace.0.ranges[0].clone()), + Some( + metadata_keyspace.0.ranges.first().unwrap().start + ..metadata_keyspace.0.ranges.last().unwrap().end, + ), ctx, ) .await diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 79f075b877..4f26f2f6d1 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -234,6 +234,7 @@ impl WalIngest { modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT, + decoded.origin_id, ctx, ) .await?; @@ -246,6 +247,7 @@ impl WalIngest { modification, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT_PREPARED, + decoded.origin_id, ctx, ) .await?; @@ -375,6 +377,18 @@ impl WalIngest { self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid; } } + pg_constants::RM_REPLORIGIN_ID => { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_REPLORIGIN_SET { + let xlrec = crate::walrecord::XlReploriginSet::decode(&mut buf); + modification + .set_replorigin(xlrec.node_id, xlrec.remote_lsn) + .await? + } else if info == pg_constants::XLOG_REPLORIGIN_DROP { + let xlrec = crate::walrecord::XlReploriginDrop::decode(&mut buf); + modification.drop_replorigin(xlrec.node_id).await? + } + } _x => { // TODO: should probably log & fail here instead of blindly // doing something without understanding the protocol @@ -1178,6 +1192,7 @@ impl WalIngest { modification: &mut DatadirModification<'_>, parsed: &XlXactParsedRecord, is_commit: bool, + origin_id: u16, ctx: &RequestContext, ) -> anyhow::Result<()> { // Record update of CLOG pages @@ -1243,6 +1258,11 @@ impl WalIngest { } } } + if origin_id != 0 { + modification + .set_replorigin(origin_id, parsed.origin_lsn) + .await?; + } Ok(()) } diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 02f6f49694..205f8dee4d 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -9,10 +9,10 @@ use postgres_ffi::pg_constants; use postgres_ffi::BLCKSZ; use postgres_ffi::{BlockNumber, TimestampTz}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; -use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD}; +use postgres_ffi::{RepOriginId, XLogRecord, XLOG_SIZE_OF_XLOG_RECORD}; use serde::{Deserialize, Serialize}; use tracing::*; -use utils::bin_ser::DeserializeError; +use utils::{bin_ser::DeserializeError, lsn::Lsn}; /// Each update to a page is represented by a NeonWalRecord. It can be a wrapper /// around a PostgreSQL WAL record, or a custom neon-specific "record". @@ -116,6 +116,7 @@ pub struct DecodedWALRecord { pub blocks: Vec, pub main_data_offset: usize, + pub origin_id: u16, } #[repr(C)] @@ -573,6 +574,7 @@ pub struct XlXactParsedRecord { pub subxacts: Vec, pub xnodes: Vec, + pub origin_lsn: Lsn, } impl XlXactParsedRecord { @@ -651,6 +653,11 @@ impl XlXactParsedRecord { debug!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE xid {}", xid); } + let origin_lsn = if xinfo & pg_constants::XACT_XINFO_HAS_ORIGIN != 0 { + Lsn(buf.get_u64_le()) + } else { + Lsn::INVALID + }; XlXactParsedRecord { xid, info, @@ -660,6 +667,7 @@ impl XlXactParsedRecord { ts_id, subxacts, xnodes, + origin_lsn, } } } @@ -810,6 +818,36 @@ impl XlRunningXacts { } } +#[repr(C)] +#[derive(Debug)] +pub struct XlReploriginDrop { + pub node_id: RepOriginId, +} + +impl XlReploriginDrop { + pub fn decode(buf: &mut Bytes) -> XlReploriginDrop { + XlReploriginDrop { + node_id: buf.get_u16_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlReploriginSet { + pub remote_lsn: Lsn, + pub node_id: RepOriginId, +} + +impl XlReploriginSet { + pub fn decode(buf: &mut Bytes) -> XlReploriginSet { + XlReploriginSet { + remote_lsn: Lsn(buf.get_u64_le()), + node_id: buf.get_u16_le(), + } + } +} + /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details @@ -844,6 +882,7 @@ pub fn decode_wal_record( let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; let mut got_rnode = false; + let mut origin_id: u16 = 0; let mut buf = record.clone(); @@ -891,7 +930,7 @@ pub fn decode_wal_record( pg_constants::XLR_BLOCK_ID_ORIGIN => { // RepOriginId is uint16 - buf.advance(2); + origin_id = buf.get_u16_le(); } pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { @@ -1088,6 +1127,7 @@ pub fn decode_wal_record( decoded.xl_info = xlogrec.xl_info; decoded.xl_rmid = xlogrec.xl_rmid; decoded.record = record; + decoded.origin_id = origin_id; decoded.main_data_offset = main_data_offset; Ok(()) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 9772e2d106..b2e4d35cb8 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -81,8 +81,10 @@ page_cache_size=10 non_vectored_sum = metrics.query_one("pageserver_layers_visited_per_read_global_sum") non_vectored_count = metrics.query_one("pageserver_layers_visited_per_read_global_count") - non_vectored_average = non_vectored_sum.value / non_vectored_count.value - + if non_vectored_count.value != 0: + non_vectored_average = non_vectored_sum.value / non_vectored_count.value + else: + non_vectored_average = 0 vectored_sum = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_sum") vectored_count = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_count") if vectored_count.value > 0: diff --git a/test_runner/regress/test_subscriber_restart.py b/test_runner/regress/test_subscriber_restart.py new file mode 100644 index 0000000000..d7f3962620 --- /dev/null +++ b/test_runner/regress/test_subscriber_restart.py @@ -0,0 +1,57 @@ +import threading +import time + +from fixtures.neon_fixtures import NeonEnv +from fixtures.utils import wait_until + + +# This test checks of logical replication subscriber is able to correctly restart replication without receiving duplicates. +# It requires tracking information about replication origins at page server side +def test_subscriber_restart(neon_simple_env: NeonEnv): + env = neon_simple_env + env.neon_cli.create_branch("publisher") + pub = env.endpoints.create("publisher") + pub.start() + + env.neon_cli.create_branch("subscriber") + sub = env.endpoints.create("subscriber") + sub.start() + + n_records = 100000 + n_restarts = 100 + + def check_that_changes_propagated(): + scur.execute("SELECT count(*) FROM t") + res = scur.fetchall() + assert res[0][0] == n_records + + def insert_data(pub): + with pub.cursor() as pcur: + for i in range(0, n_records): + pcur.execute("INSERT into t values (%s,random()*100000)", (i,)) + + with pub.cursor() as pcur: + with sub.cursor() as scur: + pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)") + pcur.execute("CREATE PUBLICATION pub FOR TABLE t") + scur.execute("CREATE TABLE t (pk integer primary key, sk integer)") + # scur.execute("CREATE INDEX on t(sk)") # slowdown applying WAL at replica + pub_conn = f"host=localhost port={pub.pg_port} dbname=postgres user=cloud_admin" + query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub" + scur.execute(query) + time.sleep(2) # let initial table sync complete + + thread = threading.Thread(target=insert_data, args=(pub,), daemon=True) + thread.start() + + for _ in range(n_restarts): + # restart subscriber + # time.sleep(2) + sub.stop("immediate") + sub.start() + + thread.join() + pcur.execute(f"INSERT into t values ({n_records}, 0)") + n_records += 1 + with sub.cursor() as scur: + wait_until(10, 0.5, check_that_changes_propagated)