From 9bc12f7444e8a574d4ac5cfe2b213a91390342c8 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 26 Jul 2022 13:45:28 +0300 Subject: [PATCH] Move auto-generated 'bindings' to a separate inner module. Re-export only things that are used by other modules. In the future, I'm imagining that we run bindgen twice, for Postgres v14 and v15. The two sets of bindings would go into separate 'bindings_v14' and 'bindings_v15' modules. Rearrange postgres_ffi modules. Move function, to avoid Postgres version dependency in timelines.rs Move function to generate a logical-message WAL record to postgres_ffi. --- libs/postgres_ffi/src/controlfile_utils.rs | 2 +- libs/postgres_ffi/src/lib.rs | 59 +++++++-- libs/postgres_ffi/src/nonrelfile_utils.rs | 5 +- libs/postgres_ffi/src/pg_constants.rs | 9 +- libs/postgres_ffi/src/relfile_utils.rs | 2 +- libs/postgres_ffi/src/waldecoder.rs | 5 +- libs/postgres_ffi/src/xlog_utils.rs | 113 ++++++++++++++++-- libs/postgres_ffi/wal_craft/src/lib.rs | 4 +- pageserver/src/basebackup.rs | 23 ++-- pageserver/src/import_datadir.rs | 32 +++-- pageserver/src/keyspace.rs | 4 +- pageserver/src/layered_repository/timeline.rs | 2 +- pageserver/src/page_cache.rs | 2 +- pageserver/src/page_service.rs | 10 +- pageserver/src/pgdatadir_mapping.rs | 18 +-- pageserver/src/reltag.rs | 5 +- pageserver/src/timelines.rs | 14 +-- pageserver/src/walingest.rs | 16 +-- .../src/walreceiver/walreceiver_connection.rs | 2 +- pageserver/src/walrecord.rs | 17 ++- pageserver/src/walredo.rs | 21 ++-- safekeeper/src/handler.rs | 2 +- safekeeper/src/json_ctrl.rs | 97 +-------------- safekeeper/src/metrics.rs | 2 +- safekeeper/src/safekeeper.rs | 5 +- safekeeper/src/send_wal.rs | 2 +- safekeeper/src/timeline.rs | 3 +- safekeeper/src/wal_backup.rs | 3 +- safekeeper/src/wal_storage.rs | 10 +- 29 files changed, 265 insertions(+), 224 deletions(-) diff --git a/libs/postgres_ffi/src/controlfile_utils.rs b/libs/postgres_ffi/src/controlfile_utils.rs index 4df2342b90..0918d15001 100644 --- a/libs/postgres_ffi/src/controlfile_utils.rs +++ b/libs/postgres_ffi/src/controlfile_utils.rs @@ -23,7 +23,7 @@ //! information. You can use PostgreSQL's pg_controldata utility to view its //! contents. //! -use crate::{ControlFileData, PG_CONTROL_FILE_SIZE}; +use super::bindings::{ControlFileData, PG_CONTROL_FILE_SIZE}; use anyhow::{bail, Result}; use bytes::{Bytes, BytesMut}; diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 28d9a13dbf..022355329c 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -7,21 +7,62 @@ // https://github.com/rust-lang/rust-bindgen/issues/1651 #![allow(deref_nullptr)] -use serde::{Deserialize, Serialize}; use utils::lsn::Lsn; -include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +macro_rules! postgres_ffi { + ($version:ident) => { + #[path = "."] + pub mod $version { + // fixme: does this have to be 'pub'? + pub mod bindings { + // bindgen generates bindings for a lot of stuff we don't need + #![allow(dead_code)] -pub mod controlfile_utils; -pub mod nonrelfile_utils; -pub mod pg_constants; -pub mod relfile_utils; -pub mod waldecoder; -pub mod xlog_utils; + use serde::{Deserialize, Serialize}; + include!(concat!(env!("OUT_DIR"), "/bindings.rs")); + } + pub mod controlfile_utils; + pub mod nonrelfile_utils; + pub mod pg_constants; + pub mod relfile_utils; + pub mod waldecoder; + pub mod xlog_utils; + + // Re-export some symbols from bindings + pub use bindings::DBState_DB_SHUTDOWNED; + pub use bindings::{CheckPoint, ControlFileData, XLogRecord}; + } + }; +} + +postgres_ffi!(v14); + +// Export some widely used datatypes that are unlikely to change across Postgres versions +pub use v14::bindings::{uint32, uint64, Oid}; +pub use v14::bindings::{BlockNumber, OffsetNumber}; +pub use v14::bindings::{MultiXactId, TransactionId}; + +// Likewise for these, although the assumption that these don't change is a little more iffy. +pub use v14::bindings::{MultiXactOffset, MultiXactStatus}; + +// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and +// --with-segsize=SEGSIZE, but assume the defaults for now. +pub const BLCKSZ: u16 = 8192; +pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); +pub const XLOG_BLCKSZ: usize = 8192; + +// PG timeline is always 1, changing it doesn't have any useful meaning in Neon. +// +// NOTE: this is not to be confused with Neon timelines; different concept! +// +// It's a shaky assumption, that it's always 1. We might import a +// PostgreSQL data directory that has gone through timeline bumps, +// for example. FIXME later. +pub const PG_TLI: u32 = 1; // See TransactionIdIsNormal in transam.h pub const fn transaction_id_is_normal(id: TransactionId) -> bool { - id > pg_constants::FIRST_NORMAL_TRANSACTION_ID + id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID } // See TransactionIdPrecedes in transam.c diff --git a/libs/postgres_ffi/src/nonrelfile_utils.rs b/libs/postgres_ffi/src/nonrelfile_utils.rs index b92207cd81..04ef346d88 100644 --- a/libs/postgres_ffi/src/nonrelfile_utils.rs +++ b/libs/postgres_ffi/src/nonrelfile_utils.rs @@ -1,11 +1,12 @@ //! //! Common utilities for dealing with PostgreSQL non-relation files. //! -use crate::{pg_constants, transaction_id_precedes}; +use crate::transaction_id_precedes; +use super::pg_constants; use bytes::BytesMut; use log::*; -use crate::MultiXactId; +use super::bindings::MultiXactId; pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) { trace!( diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 7230b841f5..42b5c5d842 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -7,7 +7,8 @@ //! comments on them. //! -use crate::PageHeaderData; +use super::bindings::PageHeaderData; +use crate::BLCKSZ; // // From pg_tablespace_d.h @@ -31,11 +32,6 @@ pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; pub const SMGR_TRUNCATE_VM: u32 = 0x0002; pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; -// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and -// --with-segsize=SEGSIZE, but assume the defaults for now. -pub const BLCKSZ: u16 = 8192; -pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); - // // From bufpage.h // @@ -213,7 +209,6 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; /* FIXME: pageserver should request wal_seg_size from compute node */ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; -pub const XLOG_BLCKSZ: usize = 8192; pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; pub const XLP_LONG_HEADER: u16 = 0x0002; diff --git a/libs/postgres_ffi/src/relfile_utils.rs b/libs/postgres_ffi/src/relfile_utils.rs index cc9d6470c0..f3476acc9c 100644 --- a/libs/postgres_ffi/src/relfile_utils.rs +++ b/libs/postgres_ffi/src/relfile_utils.rs @@ -1,7 +1,7 @@ //! //! Common utilities for dealing with PostgreSQL relation files. //! -use crate::pg_constants; +use super::pg_constants; use once_cell::sync::OnceCell; use regex::Regex; diff --git a/libs/postgres_ffi/src/waldecoder.rs b/libs/postgres_ffi/src/waldecoder.rs index cbb761236c..0e1c9567cb 100644 --- a/libs/postgres_ffi/src/waldecoder.rs +++ b/libs/postgres_ffi/src/waldecoder.rs @@ -10,10 +10,7 @@ //! use super::pg_constants; use super::xlog_utils::*; -use super::XLogLongPageHeaderData; -use super::XLogPageHeaderData; -use super::XLogRecord; -use super::XLOG_PAGE_MAGIC; +use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use crc32c::*; use log::*; diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 956f53ce85..e7838c3f2c 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -7,22 +7,24 @@ // have been named the same as the corresponding PostgreSQL functions instead. // -use crate::pg_constants; -use crate::CheckPoint; -use crate::FullTransactionId; -use crate::XLogLongPageHeaderData; -use crate::XLogPageHeaderData; -use crate::XLogRecord; -use crate::XLOG_PAGE_MAGIC; +use crc32c::crc32c_append; -use crate::pg_constants::WAL_SEGMENT_SIZE; -use crate::waldecoder::WalStreamDecoder; +use super::bindings::{ + CheckPoint, FullTransactionId, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, + XLOG_PAGE_MAGIC, +}; +use super::pg_constants; +use super::pg_constants::WAL_SEGMENT_SIZE; +use crate::v14::waldecoder::WalStreamDecoder; +use crate::PG_TLI; +use crate::{uint32, uint64, Oid}; use bytes::BytesMut; use bytes::{Buf, Bytes}; use log::*; +use serde::Serialize; use std::fs::File; use std::io::prelude::*; use std::io::ErrorKind; @@ -47,9 +49,6 @@ pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::(); #[allow(clippy::identity_op)] pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2; -// PG timeline is always 1, changing it doesn't have useful meaning in Zenith. -pub const PG_TLI: u32 = 1; - pub type XLogRecPtr = u64; pub type TimeLineID = u32; pub type TimestampTz = i64; @@ -346,6 +345,85 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result Bytes { + use utils::bin_ser::LeSer; + self.ser().unwrap().into() + } +} + +/// Create new WAL record for non-transactional logical message. +/// Used for creating artificial WAL for tests, as LogicalMessage +/// record is basically no-op. +/// +/// NOTE: This leaves the xl_prev field zero. The safekeeper and +/// pageserver tolerate that, but PostgreSQL does not. +pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { + let mut prefix_bytes: Vec = Vec::with_capacity(prefix.len() + 1); + prefix_bytes.write_all(prefix.as_bytes()).unwrap(); + prefix_bytes.push(0); + + let message_bytes = message.as_bytes(); + + let logical_message = XlLogicalMessage { + db_id: 0, + transactional: 0, + prefix_size: prefix_bytes.len() as u64, + message_size: message_bytes.len() as u64, + }; + + let mainrdata = logical_message.encode(); + let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); + // only short mainrdata is supported for now + assert!(mainrdata_len <= 255); + let mainrdata_len = mainrdata_len as u8; + + let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; + data.extend_from_slice(&mainrdata); + data.extend_from_slice(&prefix_bytes); + data.extend_from_slice(message_bytes); + + let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len(); + + let mut header = XLogRecord { + xl_tot_len: total_len as u32, + xl_xid: 0, + xl_prev: 0, + xl_info: 0, + xl_rmid: 21, + __bindgen_padding_0: [0u8; 2usize], + xl_crc: 0, // crc will be calculated later + }; + + let header_bytes = header.encode().expect("failed to encode header"); + let crc = crc32c_append(0, &data); + let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]); + header.xl_crc = crc; + + let mut wal: Vec = Vec::new(); + wal.extend_from_slice(&header.encode().expect("failed to encode header")); + wal.extend_from_slice(&data); + + // WAL start position must be aligned at 8 bytes, + // this will add padding for the next WAL record. + const PADDING: usize = 8; + let padding_rem = wal.len() % PADDING; + if padding_rem != 0 { + wal.resize(wal.len() + PADDING - padding_rem, 0); + } + + wal +} + #[cfg(test)] mod tests { use super::*; @@ -547,4 +625,15 @@ mod tests { checkpoint.update_next_xid(1024); assert_eq!(checkpoint.nextXid.value, 2048); } + + #[test] + pub fn test_encode_logical_message() { + let expected = [ + 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, + 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, + 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, + ]; + let actual = encode_logical_message("prefix", "message"); + assert_eq!(expected, actual[..]); + } } diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index e3b666da41..6ac5afb27f 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -4,8 +4,8 @@ use log::*; use once_cell::sync::Lazy; use postgres::types::PgLsn; use postgres::Client; -use postgres_ffi::pg_constants::WAL_SEGMENT_SIZE; -use postgres_ffi::xlog_utils::{ +use postgres_ffi::v14::pg_constants::WAL_SEGMENT_SIZE; +use postgres_ffi::v14::xlog_utils::{ XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, }; use std::cmp::Ordering; diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 5837447ce8..33f072553f 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -24,8 +24,13 @@ use tracing::*; use crate::reltag::{RelTag, SlruKind}; use crate::DatadirTimeline; -use postgres_ffi::xlog_utils::*; -use postgres_ffi::*; + +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName}; +use postgres_ffi::v14::{CheckPoint, ControlFileData}; +use postgres_ffi::TransactionId; +use postgres_ffi::PG_TLI; +use postgres_ffi::{BLCKSZ, RELSEG_SIZE}; use utils::lsn::Lsn; /// This is short-living object only for the time of tarball creation, @@ -200,7 +205,7 @@ where } // Add a file for each chunk of blocks (aka segment) - let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize); + let chunks = (0..nblocks).chunks(RELSEG_SIZE as usize); for (seg, blocks) in chunks.into_iter().enumerate() { let mut segment_data: Vec = vec![]; for blknum in blocks { @@ -220,23 +225,19 @@ where fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { let nblocks = self.timeline.get_slru_segment_size(slru, segno, self.lsn)?; - let mut slru_buf: Vec = - Vec::with_capacity(nblocks as usize * pg_constants::BLCKSZ as usize); + let mut slru_buf: Vec = Vec::with_capacity(nblocks as usize * BLCKSZ as usize); for blknum in 0..nblocks { let img = self .timeline .get_slru_page_at_lsn(slru, segno, blknum, self.lsn)?; if slru == SlruKind::Clog { - ensure!( - img.len() == pg_constants::BLCKSZ as usize - || img.len() == pg_constants::BLCKSZ as usize + 8 - ); + ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8); } else { - ensure!(img.len() == pg_constants::BLCKSZ as usize); + ensure!(img.len() == BLCKSZ as usize); } - slru_buf.extend_from_slice(&img[..pg_constants::BLCKSZ as usize]); + slru_buf.extend_from_slice(&img[..BLCKSZ as usize]); } let segname = format!("{}/{:>04X}", slru.to_str(), segno); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 7d1e8e43aa..729829c5e8 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -15,13 +15,24 @@ use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::walingest::WalIngest; use crate::walrecord::DecodedWALRecord; -use postgres_ffi::relfile_utils::*; -use postgres_ffi::waldecoder::*; -use postgres_ffi::xlog_utils::*; +use postgres_ffi::v14::relfile_utils::*; +use postgres_ffi::v14::waldecoder::*; +use postgres_ffi::v14::xlog_utils::*; +use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; use postgres_ffi::Oid; -use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; +use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; +// Returns checkpoint LSN from controlfile +pub fn get_lsn_from_controlfile(path: &Path) -> Result { + // Read control file to extract the LSN + let controlfile_path = path.join("global").join("pg_control"); + let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?; + let lsn = controlfile.checkPoint; + + Ok(Lsn(lsn)) +} + /// /// Import all relation data pages from local disk into the repository. /// @@ -110,8 +121,8 @@ fn import_rel( let mut buf: [u8; 8192] = [0u8; 8192]; - ensure!(len % pg_constants::BLCKSZ as usize == 0); - let nblocks = len / pg_constants::BLCKSZ as usize; + ensure!(len % BLCKSZ as usize == 0); + let nblocks = len / BLCKSZ as usize; let rel = RelTag { spcnode: spcoid, @@ -120,7 +131,7 @@ fn import_rel( forknum, }; - let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); + let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32); // Call put_rel_creation for every segment of the relation, // because there is no guarantee about the order in which we are processing segments. @@ -144,8 +155,7 @@ fn import_rel( Err(err) => match err.kind() { std::io::ErrorKind::UnexpectedEof => { // reached EOF. That's expected. - let relative_blknum = - blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); + let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32); ensure!(relative_blknum == nblocks as u32, "unexpected EOF"); break; } @@ -184,8 +194,8 @@ fn import_slru( .to_string_lossy(); let segno = u32::from_str_radix(filename, 16)?; - ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ - let nblocks = len / pg_constants::BLCKSZ as usize; + ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ + let nblocks = len / BLCKSZ as usize; ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize); diff --git a/pageserver/src/keyspace.rs b/pageserver/src/keyspace.rs index da213704f3..64024a2d8d 100644 --- a/pageserver/src/keyspace.rs +++ b/pageserver/src/keyspace.rs @@ -1,5 +1,5 @@ use crate::repository::{key_range_size, singleton_range, Key}; -use postgres_ffi::pg_constants; +use postgres_ffi::BLCKSZ; use std::ops::Range; /// @@ -19,7 +19,7 @@ impl KeySpace { /// pub fn partition(&self, target_size: u64) -> KeyPartitioning { // Assume that each value is 8k in size. - let target_nblocks = (target_size / pg_constants::BLCKSZ as u64) as usize; + let target_nblocks = (target_size / BLCKSZ as u64) as usize; let mut parts = Vec::new(); let mut current_part = Vec::new(); diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index e27619cc83..6ef4915bdb 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -45,7 +45,7 @@ use crate::reltag::RelTag; use crate::tenant_config::TenantConfOpt; use crate::DatadirTimeline; -use postgres_ffi::xlog_utils::to_pg_timestamp; +use postgres_ffi::v14::xlog_utils::to_pg_timestamp; use utils::{ lsn::{AtomicLsn, Lsn, RecordLsn}, seqwait::SeqWait, diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 716df0f749..818eaf1b8f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -83,7 +83,7 @@ pub fn get() -> &'static PageCache { } } -pub const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize; +pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize; const MAX_USAGE_COUNT: u8 = 5; /// diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3c5ea5267e..b63bb90be1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -40,9 +40,10 @@ use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::CheckpointConfig; use metrics::{register_histogram_vec, HistogramVec}; -use postgres_ffi::xlog_utils::to_pg_timestamp; +use postgres_ffi::v14::xlog_utils::to_pg_timestamp; -use postgres_ffi::pg_constants; +use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID; +use postgres_ffi::BLCKSZ; // Wrapped in libpq CopyData enum PagestreamFeMessage { @@ -725,10 +726,9 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; - let total_blocks = - timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?; + let total_blocks = timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn)?; - let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64; + let db_size = total_blocks as i64 * BLCKSZ as i64; Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { db_size, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 113f40302a..88fac0ad5a 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,8 +13,10 @@ use crate::repository::*; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; -use postgres_ffi::xlog_utils::TimestampTz; -use postgres_ffi::{pg_constants, Oid, TransactionId}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::TimestampTz; +use postgres_ffi::BLCKSZ; +use postgres_ffi::{Oid, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::ops::Range; @@ -297,9 +299,9 @@ pub trait DatadirTimeline: Timeline { let clog_page = self.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)?; - if clog_page.len() == pg_constants::BLCKSZ as usize + 8 { + if clog_page.len() == BLCKSZ as usize + 8 { let mut timestamp_bytes = [0u8; 8]; - timestamp_bytes.copy_from_slice(&clog_page[pg_constants::BLCKSZ as usize..]); + timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]); let timestamp = TimestampTz::from_be_bytes(timestamp_bytes); if timestamp >= search_timestamp { @@ -382,7 +384,7 @@ pub trait DatadirTimeline: Timeline { total_size += relsize as usize; } } - Ok(total_size * pg_constants::BLCKSZ as usize) + Ok(total_size * BLCKSZ as usize) } /// @@ -912,7 +914,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { result?; if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize); + writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize); self.pending_nblocks = 0; } @@ -940,7 +942,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { writer.finish_write(lsn); if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize); + writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize); } Ok(()) @@ -1014,7 +1016,7 @@ struct SlruSegmentDirectory { segments: HashSet, } -static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; pg_constants::BLCKSZ as usize]); +static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // Layout of the Key address space // diff --git a/pageserver/src/reltag.rs b/pageserver/src/reltag.rs index fadd41f547..e3d08f8b3d 100644 --- a/pageserver/src/reltag.rs +++ b/pageserver/src/reltag.rs @@ -2,8 +2,9 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::fmt; -use postgres_ffi::relfile_utils::forknumber_to_name; -use postgres_ffi::{pg_constants, Oid}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::relfile_utils::forknumber_to_name; +use postgres_ffi::Oid; /// /// Relation data file segment id throughout the Postgres cluster. diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index ed5975d3bd..0d35195691 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -3,7 +3,7 @@ // use anyhow::{bail, ensure, Context, Result}; -use postgres_ffi::ControlFileData; + use std::{ fs, path::Path, @@ -69,16 +69,6 @@ pub fn create_repo( ))) } -// Returns checkpoint LSN from controlfile -fn get_lsn_from_controlfile(path: &Path) -> Result { - // Read control file to extract the LSN - let controlfile_path = path.join("global").join("pg_control"); - let controlfile = ControlFileData::decode(&fs::read(controlfile_path)?)?; - let lsn = controlfile.checkPoint; - - Ok(Lsn(lsn)) -} - // Create the cluster temporarily in 'initdbpath' directory inside the repository // to get bootstrap data for timeline initialization. // @@ -128,7 +118,7 @@ fn bootstrap_timeline( run_initdb(conf, &initdb_path)?; let pgdata_path = initdb_path; - let lsn = get_lsn_from_controlfile(&pgdata_path)?.align(); + let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align(); // Import the contents of the data directory at the initial checkpoint // LSN, and any WAL after that. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index b8064849e0..1b046b9f33 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -22,8 +22,8 @@ //! bespoken Rust code. use anyhow::Context; -use postgres_ffi::nonrelfile_utils::clogpage_precedes; -use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment; +use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; +use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{page_is_new, page_set_lsn}; use anyhow::Result; @@ -33,10 +33,12 @@ use tracing::*; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::walrecord::*; -use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment; -use postgres_ffi::xlog_utils::*; +use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::*; +use postgres_ffi::v14::CheckPoint; use postgres_ffi::TransactionId; -use postgres_ffi::{pg_constants, CheckPoint}; +use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); @@ -293,7 +295,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { // Extract page image from FPI record let img_len = blk.bimg_len as usize; let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); + let mut image = BytesMut::with_capacity(BLCKSZ as usize); image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); if blk.hole_length != 0 { @@ -309,7 +311,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { if !page_is_new(&image) { page_set_lsn(&mut image, lsn) } - assert_eq!(image.len(), pg_constants::BLCKSZ as usize); + assert_eq!(image.len(), BLCKSZ as usize); self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?; } else { let rec = ZenithWalRecord::Postgres { diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 16a1f232e3..0688086117 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -27,7 +27,7 @@ use crate::{ walingest::WalIngest, walrecord::DecodedWALRecord, }; -use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::v14::waldecoder::WalStreamDecoder; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; /// Status of the connection. diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 6b01d52005..c56b1c6c0c 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -3,9 +3,10 @@ //! use anyhow::Result; use bytes::{Buf, Bytes}; -use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; -use postgres_ffi::XLogRecord; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; +use postgres_ffi::v14::XLogRecord; +use postgres_ffi::BLCKSZ; use postgres_ffi::{BlockNumber, OffsetNumber}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; use serde::{Deserialize, Serialize}; @@ -618,7 +619,7 @@ pub fn decode_wal_record( blk.hole_length = 0; } } else { - blk.hole_length = pg_constants::BLCKSZ - blk.bimg_len; + blk.hole_length = BLCKSZ - blk.bimg_len; } datatotal += blk.bimg_len as u32; blocks_total_len += blk.bimg_len as u32; @@ -628,9 +629,7 @@ pub fn decode_wal_record( * bimg_len < BLCKSZ if the HAS_HOLE flag is set. */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 - && (blk.hole_offset == 0 - || blk.hole_length == 0 - || blk.bimg_len == pg_constants::BLCKSZ) + && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ) { // TODO /* @@ -667,7 +666,7 @@ pub fn decode_wal_record( * flag is set. */ if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0) - && blk.bimg_len == pg_constants::BLCKSZ + && blk.bimg_len == BLCKSZ { // TODO /* @@ -685,7 +684,7 @@ pub fn decode_wal_record( */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 && blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0 - && blk.bimg_len != pg_constants::BLCKSZ + && blk.bimg_len != BLCKSZ { // TODO /* diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 57817dbc9c..9cf347573a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -44,11 +44,12 @@ use crate::reltag::{RelTag, SlruKind}; use crate::repository::Key; use crate::walrecord::ZenithWalRecord; use metrics::{register_histogram, register_int_counter, Histogram, IntCounter}; -use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift; -use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset; -use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset; -use postgres_ffi::nonrelfile_utils::transaction_id_set_status; -use postgres_ffi::pg_constants; +use postgres_ffi::v14::nonrelfile_utils::{ + mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, + transaction_id_set_status, +}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::BLCKSZ; /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. @@ -417,10 +418,10 @@ impl PostgresRedoManager { } // Append the timestamp - if page.len() == pg_constants::BLCKSZ as usize + 8 { - page.truncate(pg_constants::BLCKSZ as usize); + if page.len() == BLCKSZ as usize + 8 { + page.truncate(BLCKSZ as usize); } - if page.len() == pg_constants::BLCKSZ as usize { + if page.len() == BLCKSZ as usize { page.extend_from_slice(×tamp.to_be_bytes()); } else { warn!( @@ -741,7 +742,7 @@ impl PostgresRedoProcess { // We expect the WAL redo process to respond with an 8k page image. We read it // into this buffer. - let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()]; + let mut resultbuf = vec![0; BLCKSZ.into()]; let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far // Prepare for calling poll() @@ -754,7 +755,7 @@ impl PostgresRedoProcess { // We do three things simultaneously: send the old base image and WAL records to // the child process's stdin, read the result from child's stdout, and forward any logging // information that the child writes to its stderr to the page server's log. - while nresult < pg_constants::BLCKSZ.into() { + while nresult < BLCKSZ.into() { // If we have more data to write, wake up if 'stdin' becomes writeable or // we have data to read. Otherwise only wake up if there's data to read. let nfds = if nwrite < writebuf.len() { 3 } else { 2 }; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index a8121e829e..63bc9bd517 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -9,7 +9,7 @@ use crate::timeline::{Timeline, TimelineTools}; use crate::SafeKeeperConf; use anyhow::{bail, Context, Result}; -use postgres_ffi::xlog_utils::PG_TLI; +use postgres_ffi::PG_TLI; use regex::Regex; use std::str::FromStr; use std::sync::Arc; diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 97fb3654d2..3f84e7b183 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -7,8 +7,7 @@ //! use anyhow::Result; -use bytes::{BufMut, Bytes, BytesMut}; -use crc32c::crc32c_append; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use tracing::*; @@ -19,9 +18,8 @@ use crate::safekeeper::{ }; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; use crate::timeline::TimelineTools; -use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils; -use postgres_ffi::{uint32, uint64, Oid, XLogRecord}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils; use utils::{ lsn::Lsn, postgres_backend::PostgresBackend, @@ -144,7 +142,7 @@ fn append_logical_message( spg: &mut SafekeeperPostgresHandler, msg: &AppendLogicalMessage, ) -> Result { - let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); + let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message); let sk_state = spg.timeline.get().get_state().1; let begin_lsn = msg.begin_lsn; @@ -182,90 +180,3 @@ fn append_logical_message( append_response, }) } - -#[repr(C)] -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -struct XlLogicalMessage { - db_id: Oid, - transactional: uint32, // bool, takes 4 bytes due to alignment in C structures - prefix_size: uint64, - message_size: uint64, -} - -impl XlLogicalMessage { - pub fn encode(&self) -> Bytes { - use utils::bin_ser::LeSer; - self.ser().unwrap().into() - } -} - -/// Create new WAL record for non-transactional logical message. -/// Used for creating artificial WAL for tests, as LogicalMessage -/// record is basically no-op. -fn encode_logical_message(prefix: &str, message: &str) -> Vec { - let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1); - prefix_bytes.put(prefix.as_bytes()); - prefix_bytes.put_u8(0); - - let message_bytes = message.as_bytes(); - - let logical_message = XlLogicalMessage { - db_id: 0, - transactional: 0, - prefix_size: prefix_bytes.len() as u64, - message_size: message_bytes.len() as u64, - }; - - let mainrdata = logical_message.encode(); - let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); - // only short mainrdata is supported for now - assert!(mainrdata_len <= 255); - let mainrdata_len = mainrdata_len as u8; - - let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; - data.extend_from_slice(&mainrdata); - data.extend_from_slice(&prefix_bytes); - data.extend_from_slice(message_bytes); - - let total_len = xlog_utils::XLOG_SIZE_OF_XLOG_RECORD + data.len(); - - let mut header = XLogRecord { - xl_tot_len: total_len as u32, - xl_xid: 0, - xl_prev: 0, - xl_info: 0, - xl_rmid: 21, - __bindgen_padding_0: [0u8; 2usize], - xl_crc: 0, // crc will be calculated later - }; - - let header_bytes = header.encode().expect("failed to encode header"); - let crc = crc32c_append(0, &data); - let crc = crc32c_append(crc, &header_bytes[0..xlog_utils::XLOG_RECORD_CRC_OFFS]); - header.xl_crc = crc; - - let mut wal: Vec = Vec::new(); - wal.extend_from_slice(&header.encode().expect("failed to encode header")); - wal.extend_from_slice(&data); - - // WAL start position must be aligned at 8 bytes, - // this will add padding for the next WAL record. - const PADDING: usize = 8; - let padding_rem = wal.len() % PADDING; - if padding_rem != 0 { - wal.resize(wal.len() + PADDING - padding_rem, 0); - } - - wal -} - -#[test] -fn test_encode_logical_message() { - let expected = [ - 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38, - 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102, - 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, - ]; - let actual = encode_logical_message("prefix", "message"); - assert_eq!(expected, actual[..]); -} diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index fe4f9d231c..648f0634f8 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -7,7 +7,7 @@ use metrics::{ proto::MetricFamily, Gauge, IntGaugeVec, }; -use postgres_ffi::xlog_utils::XLogSegNo; +use postgres_ffi::v14::xlog_utils::XLogSegNo; use utils::{lsn::Lsn, zid::ZTenantTimelineId}; use crate::{ diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 88747f14e5..22f8ca2de4 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,9 +5,7 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use etcd_broker::subscription_value::SkTimelineInfo; -use postgres_ffi::xlog_utils::TimeLineID; - -use postgres_ffi::xlog_utils::XLogSegNo; +use postgres_ffi::v14::xlog_utils::{TimeLineID, XLogSegNo, MAX_SEND_SIZE}; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -19,7 +17,6 @@ use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; -use postgres_ffi::xlog_utils::MAX_SEND_SIZE; use utils::{ bin_ser::LeSer, lsn::Lsn, diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 4a9c56859f..243d7bf7d0 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -6,7 +6,7 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use crate::wal_storage::WalReader; use anyhow::{bail, Context, Result}; -use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; +use postgres_ffi::v14::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; use bytes::Bytes; use serde::{Deserialize, Serialize}; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 161fca3595..3a10c5d59e 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,8 +4,9 @@ use anyhow::{bail, Context, Result}; use etcd_broker::subscription_value::SkTimelineInfo; + use once_cell::sync::Lazy; -use postgres_ffi::xlog_utils::XLogSegNo; +use postgres_ffi::v14::xlog_utils::XLogSegNo; use serde::Serialize; use tokio::sync::watch; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index b2f9d8d4f3..3552452470 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -11,7 +11,8 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI}; +use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr}; +use postgres_ffi::PG_TLI; use remote_storage::{GenericRemoteStorage, RemoteStorage}; use tokio::fs::File; use tokio::runtime::Builder; diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 5f4bf588c7..6a45ae1411 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -13,9 +13,10 @@ use std::pin::Pin; use tokio::io::AsyncRead; use once_cell::sync::Lazy; -use postgres_ffi::xlog_utils::{ - find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI, +use postgres_ffi::v14::xlog_utils::{ + find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, }; +use postgres_ffi::PG_TLI; use std::cmp::min; use std::fs::{self, remove_file, File, OpenOptions}; @@ -30,9 +31,10 @@ use crate::safekeeper::SafeKeeperState; use crate::wal_backup::read_object; use crate::SafeKeeperConf; -use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; +use postgres_ffi::v14::xlog_utils::XLogFileName; +use postgres_ffi::XLOG_BLCKSZ; -use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::v14::waldecoder::WalStreamDecoder; use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};