diff --git a/libs/postgres_ffi/src/controlfile_utils.rs b/libs/postgres_ffi/src/controlfile_utils.rs index 4df2342b90..0918d15001 100644 --- a/libs/postgres_ffi/src/controlfile_utils.rs +++ b/libs/postgres_ffi/src/controlfile_utils.rs @@ -23,7 +23,7 @@ //! information. You can use PostgreSQL's pg_controldata utility to view its //! contents. //! -use crate::{ControlFileData, PG_CONTROL_FILE_SIZE}; +use super::bindings::{ControlFileData, PG_CONTROL_FILE_SIZE}; use anyhow::{bail, Result}; use bytes::{Bytes, BytesMut}; diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 28d9a13dbf..022355329c 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -7,21 +7,62 @@ // https://github.com/rust-lang/rust-bindgen/issues/1651 #![allow(deref_nullptr)] -use serde::{Deserialize, Serialize}; use utils::lsn::Lsn; -include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +macro_rules! postgres_ffi { + ($version:ident) => { + #[path = "."] + pub mod $version { + // fixme: does this have to be 'pub'? + pub mod bindings { + // bindgen generates bindings for a lot of stuff we don't need + #![allow(dead_code)] -pub mod controlfile_utils; -pub mod nonrelfile_utils; -pub mod pg_constants; -pub mod relfile_utils; -pub mod waldecoder; -pub mod xlog_utils; + use serde::{Deserialize, Serialize}; + include!(concat!(env!("OUT_DIR"), "/bindings.rs")); + } + pub mod controlfile_utils; + pub mod nonrelfile_utils; + pub mod pg_constants; + pub mod relfile_utils; + pub mod waldecoder; + pub mod xlog_utils; + + // Re-export some symbols from bindings + pub use bindings::DBState_DB_SHUTDOWNED; + pub use bindings::{CheckPoint, ControlFileData, XLogRecord}; + } + }; +} + +postgres_ffi!(v14); + +// Export some widely used datatypes that are unlikely to change across Postgres versions +pub use v14::bindings::{uint32, uint64, Oid}; +pub use v14::bindings::{BlockNumber, OffsetNumber}; +pub use v14::bindings::{MultiXactId, TransactionId}; + +// Likewise for these, although the assumption that these don't change is a little more iffy. +pub use v14::bindings::{MultiXactOffset, MultiXactStatus}; + +// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and +// --with-segsize=SEGSIZE, but assume the defaults for now. +pub const BLCKSZ: u16 = 8192; +pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); +pub const XLOG_BLCKSZ: usize = 8192; + +// PG timeline is always 1, changing it doesn't have any useful meaning in Neon. +// +// NOTE: this is not to be confused with Neon timelines; different concept! +// +// It's a shaky assumption, that it's always 1. We might import a +// PostgreSQL data directory that has gone through timeline bumps, +// for example. FIXME later. +pub const PG_TLI: u32 = 1; // See TransactionIdIsNormal in transam.h pub const fn transaction_id_is_normal(id: TransactionId) -> bool { - id > pg_constants::FIRST_NORMAL_TRANSACTION_ID + id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID } // See TransactionIdPrecedes in transam.c diff --git a/libs/postgres_ffi/src/nonrelfile_utils.rs b/libs/postgres_ffi/src/nonrelfile_utils.rs index b92207cd81..04ef346d88 100644 --- a/libs/postgres_ffi/src/nonrelfile_utils.rs +++ b/libs/postgres_ffi/src/nonrelfile_utils.rs @@ -1,11 +1,12 @@ //! //! Common utilities for dealing with PostgreSQL non-relation files. //! -use crate::{pg_constants, transaction_id_precedes}; +use crate::transaction_id_precedes; +use super::pg_constants; use bytes::BytesMut; use log::*; -use crate::MultiXactId; +use super::bindings::MultiXactId; pub fn transaction_id_set_status(xid: u32, status: u8, page: &mut BytesMut) { trace!( diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 7230b841f5..42b5c5d842 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -7,7 +7,8 @@ //! comments on them. //! -use crate::PageHeaderData; +use super::bindings::PageHeaderData; +use crate::BLCKSZ; // // From pg_tablespace_d.h @@ -31,11 +32,6 @@ pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; pub const SMGR_TRUNCATE_VM: u32 = 0x0002; pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; -// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and -// --with-segsize=SEGSIZE, but assume the defaults for now. -pub const BLCKSZ: u16 = 8192; -pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); - // // From bufpage.h // @@ -213,7 +209,6 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; /* FIXME: pageserver should request wal_seg_size from compute node */ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; -pub const XLOG_BLCKSZ: usize = 8192; pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; pub const XLP_LONG_HEADER: u16 = 0x0002; diff --git a/libs/postgres_ffi/src/relfile_utils.rs b/libs/postgres_ffi/src/relfile_utils.rs index cc9d6470c0..f3476acc9c 100644 --- a/libs/postgres_ffi/src/relfile_utils.rs +++ b/libs/postgres_ffi/src/relfile_utils.rs @@ -1,7 +1,7 @@ //! //! Common utilities for dealing with PostgreSQL relation files. //! -use crate::pg_constants; +use super::pg_constants; use once_cell::sync::OnceCell; use regex::Regex; diff --git a/libs/postgres_ffi/src/waldecoder.rs b/libs/postgres_ffi/src/waldecoder.rs index cbb761236c..0e1c9567cb 100644 --- a/libs/postgres_ffi/src/waldecoder.rs +++ b/libs/postgres_ffi/src/waldecoder.rs @@ -10,10 +10,7 @@ //! use super::pg_constants; use super::xlog_utils::*; -use super::XLogLongPageHeaderData; -use super::XLogPageHeaderData; -use super::XLogRecord; -use super::XLOG_PAGE_MAGIC; +use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use crc32c::*; use log::*; diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 956f53ce85..e7838c3f2c 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -7,22 +7,24 @@ // have been named the same as the corresponding PostgreSQL functions instead. // -use crate::pg_constants; -use crate::CheckPoint; -use crate::FullTransactionId; -use crate::XLogLongPageHeaderData; -use crate::XLogPageHeaderData; -use crate::XLogRecord; -use crate::XLOG_PAGE_MAGIC; +use crc32c::crc32c_append; -use crate::pg_constants::WAL_SEGMENT_SIZE; -use crate::waldecoder::WalStreamDecoder; +use super::bindings::{ + CheckPoint, FullTransactionId, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, + XLOG_PAGE_MAGIC, +}; +use super::pg_constants; +use super::pg_constants::WAL_SEGMENT_SIZE; +use crate::v14::waldecoder::WalStreamDecoder; +use crate::PG_TLI; +use crate::{uint32, uint64, Oid}; use bytes::BytesMut; use bytes::{Buf, Bytes}; use log::*; +use serde::Serialize; use std::fs::File; use std::io::prelude::*; use std::io::ErrorKind; @@ -47,9 +49,6 @@ pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::(); #[allow(clippy::identity_op)] pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2; -// PG timeline is always 1, changing it doesn't have useful meaning in Zenith. -pub const PG_TLI: u32 = 1; - pub type XLogRecPtr = u64; pub type TimeLineID = u32; pub type TimestampTz = i64; @@ -346,6 +345,85 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result Bytes { + use utils::bin_ser::LeSer; + self.ser().unwrap().into() + } +} + +/// Create new WAL record for non-transactional logical message. +/// Used for creating artificial WAL for tests, as LogicalMessage +/// record is basically no-op. +/// +/// NOTE: This leaves the xl_prev field zero. The safekeeper and +/// pageserver tolerate that, but PostgreSQL does not. +pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { + let mut prefix_bytes: Vec = Vec::with_capacity(prefix.len() + 1); + prefix_bytes.write_all(prefix.as_bytes()).unwrap(); + prefix_bytes.push(0); + + let message_bytes = message.as_bytes(); + + let logical_message = XlLogicalMessage { + db_id: 0, + transactional: 0, + prefix_size: prefix_bytes.len() as u64, + message_size: message_bytes.len() as u64, + }; + + let mainrdata = logical_message.encode(); + let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); + // only short mainrdata is supported for now + assert!(mainrdata_len <= 255); + let mainrdata_len = mainrdata_len as u8; + + let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; + data.extend_from_slice(&mainrdata); + data.extend_from_slice(&prefix_bytes); + data.extend_from_slice(message_bytes); + + let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len(); + + let mut header = XLogRecord { + xl_tot_len: total_len as u32, + xl_xid: 0, + xl_prev: 0, + xl_info: 0, + xl_rmid: 21, + __bindgen_padding_0: [0u8; 2usize], + xl_crc: 0, // crc will be calculated later + }; + + let header_bytes = header.encode().expect("failed to encode header"); + let crc = crc32c_append(0, &data); + let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]); + header.xl_crc = crc; + + let mut wal: Vec = Vec::new(); + wal.extend_from_slice(&header.encode().expect("failed to encode header")); + wal.extend_from_slice(&data); + + // WAL start position must be aligned at 8 bytes, + // this will add padding for the next WAL record. + const PADDING: usize = 8; + let padding_rem = wal.len() % PADDING; + if padding_rem != 0 { + wal.resize(wal.len() + PADDING - padding_rem, 0); + } + + wal +} + #[cfg(test)] mod tests { use super::*; @@ -547,4 +625,15 @@ mod tests { checkpoint.update_next_xid(1024); assert_eq!(checkpoint.nextXid.value, 2048); } + + #[test] + pub fn test_encode_logical_message() { + let expected = [ + 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, + 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, + 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, + ]; + let actual = encode_logical_message("prefix", "message"); + assert_eq!(expected, actual[..]); + } } diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index e3b666da41..6ac5afb27f 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -4,8 +4,8 @@ use log::*; use once_cell::sync::Lazy; use postgres::types::PgLsn; use postgres::Client; -use postgres_ffi::pg_constants::WAL_SEGMENT_SIZE; -use postgres_ffi::xlog_utils::{ +use postgres_ffi::v14::pg_constants::WAL_SEGMENT_SIZE; +use postgres_ffi::v14::xlog_utils::{ XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, }; use std::cmp::Ordering; diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 5837447ce8..33f072553f 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -24,8 +24,13 @@ use tracing::*; use crate::reltag::{RelTag, SlruKind}; use crate::DatadirTimeline; -use postgres_ffi::xlog_utils::*; -use postgres_ffi::*; + +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName}; +use postgres_ffi::v14::{CheckPoint, ControlFileData}; +use postgres_ffi::TransactionId; +use postgres_ffi::PG_TLI; +use postgres_ffi::{BLCKSZ, RELSEG_SIZE}; use utils::lsn::Lsn; /// This is short-living object only for the time of tarball creation, @@ -200,7 +205,7 @@ where } // Add a file for each chunk of blocks (aka segment) - let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize); + let chunks = (0..nblocks).chunks(RELSEG_SIZE as usize); for (seg, blocks) in chunks.into_iter().enumerate() { let mut segment_data: Vec = vec![]; for blknum in blocks { @@ -220,23 +225,19 @@ where fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { let nblocks = self.timeline.get_slru_segment_size(slru, segno, self.lsn)?; - let mut slru_buf: Vec = - Vec::with_capacity(nblocks as usize * pg_constants::BLCKSZ as usize); + let mut slru_buf: Vec = Vec::with_capacity(nblocks as usize * BLCKSZ as usize); for blknum in 0..nblocks { let img = self .timeline .get_slru_page_at_lsn(slru, segno, blknum, self.lsn)?; if slru == SlruKind::Clog { - ensure!( - img.len() == pg_constants::BLCKSZ as usize - || img.len() == pg_constants::BLCKSZ as usize + 8 - ); + ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8); } else { - ensure!(img.len() == pg_constants::BLCKSZ as usize); + ensure!(img.len() == BLCKSZ as usize); } - slru_buf.extend_from_slice(&img[..pg_constants::BLCKSZ as usize]); + slru_buf.extend_from_slice(&img[..BLCKSZ as usize]); } let segname = format!("{}/{:>04X}", slru.to_str(), segno); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 7d1e8e43aa..729829c5e8 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -15,13 +15,24 @@ use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::walingest::WalIngest; use crate::walrecord::DecodedWALRecord; -use postgres_ffi::relfile_utils::*; -use postgres_ffi::waldecoder::*; -use postgres_ffi::xlog_utils::*; +use postgres_ffi::v14::relfile_utils::*; +use postgres_ffi::v14::waldecoder::*; +use postgres_ffi::v14::xlog_utils::*; +use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; use postgres_ffi::Oid; -use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; +use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; +// Returns checkpoint LSN from controlfile +pub fn get_lsn_from_controlfile(path: &Path) -> Result { + // Read control file to extract the LSN + let controlfile_path = path.join("global").join("pg_control"); + let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?; + let lsn = controlfile.checkPoint; + + Ok(Lsn(lsn)) +} + /// /// Import all relation data pages from local disk into the repository. /// @@ -110,8 +121,8 @@ fn import_rel( let mut buf: [u8; 8192] = [0u8; 8192]; - ensure!(len % pg_constants::BLCKSZ as usize == 0); - let nblocks = len / pg_constants::BLCKSZ as usize; + ensure!(len % BLCKSZ as usize == 0); + let nblocks = len / BLCKSZ as usize; let rel = RelTag { spcnode: spcoid, @@ -120,7 +131,7 @@ fn import_rel( forknum, }; - let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); + let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32); // Call put_rel_creation for every segment of the relation, // because there is no guarantee about the order in which we are processing segments. @@ -144,8 +155,7 @@ fn import_rel( Err(err) => match err.kind() { std::io::ErrorKind::UnexpectedEof => { // reached EOF. That's expected. - let relative_blknum = - blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); + let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32); ensure!(relative_blknum == nblocks as u32, "unexpected EOF"); break; } @@ -184,8 +194,8 @@ fn import_slru( .to_string_lossy(); let segno = u32::from_str_radix(filename, 16)?; - ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ - let nblocks = len / pg_constants::BLCKSZ as usize; + ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ + let nblocks = len / BLCKSZ as usize; ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize); diff --git a/pageserver/src/keyspace.rs b/pageserver/src/keyspace.rs index da213704f3..64024a2d8d 100644 --- a/pageserver/src/keyspace.rs +++ b/pageserver/src/keyspace.rs @@ -1,5 +1,5 @@ use crate::repository::{key_range_size, singleton_range, Key}; -use postgres_ffi::pg_constants; +use postgres_ffi::BLCKSZ; use std::ops::Range; /// @@ -19,7 +19,7 @@ impl KeySpace { /// pub fn partition(&self, target_size: u64) -> KeyPartitioning { // Assume that each value is 8k in size. - let target_nblocks = (target_size / pg_constants::BLCKSZ as u64) as usize; + let target_nblocks = (target_size / BLCKSZ as u64) as usize; let mut parts = Vec::new(); let mut current_part = Vec::new(); diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index e27619cc83..6ef4915bdb 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -45,7 +45,7 @@ use crate::reltag::RelTag; use crate::tenant_config::TenantConfOpt; use crate::DatadirTimeline; -use postgres_ffi::xlog_utils::to_pg_timestamp; +use postgres_ffi::v14::xlog_utils::to_pg_timestamp; use utils::{ lsn::{AtomicLsn, Lsn, RecordLsn}, seqwait::SeqWait, diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 716df0f749..818eaf1b8f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -83,7 +83,7 @@ pub fn get() -> &'static PageCache { } } -pub const PAGE_SZ: usize = postgres_ffi::pg_constants::BLCKSZ as usize; +pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize; const MAX_USAGE_COUNT: u8 = 5; /// diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3c5ea5267e..b63bb90be1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -40,9 +40,10 @@ use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::CheckpointConfig; use metrics::{register_histogram_vec, HistogramVec}; -use postgres_ffi::xlog_utils::to_pg_timestamp; +use postgres_ffi::v14::xlog_utils::to_pg_timestamp; -use postgres_ffi::pg_constants; +use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID; +use postgres_ffi::BLCKSZ; // Wrapped in libpq CopyData enum PagestreamFeMessage { @@ -725,10 +726,9 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; - let total_blocks = - timeline.get_db_size(pg_constants::DEFAULTTABLESPACE_OID, req.dbnode, lsn)?; + let total_blocks = timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn)?; - let db_size = total_blocks as i64 * pg_constants::BLCKSZ as i64; + let db_size = total_blocks as i64 * BLCKSZ as i64; Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { db_size, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 113f40302a..88fac0ad5a 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,8 +13,10 @@ use crate::repository::*; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; -use postgres_ffi::xlog_utils::TimestampTz; -use postgres_ffi::{pg_constants, Oid, TransactionId}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::TimestampTz; +use postgres_ffi::BLCKSZ; +use postgres_ffi::{Oid, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::ops::Range; @@ -297,9 +299,9 @@ pub trait DatadirTimeline: Timeline { let clog_page = self.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)?; - if clog_page.len() == pg_constants::BLCKSZ as usize + 8 { + if clog_page.len() == BLCKSZ as usize + 8 { let mut timestamp_bytes = [0u8; 8]; - timestamp_bytes.copy_from_slice(&clog_page[pg_constants::BLCKSZ as usize..]); + timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]); let timestamp = TimestampTz::from_be_bytes(timestamp_bytes); if timestamp >= search_timestamp { @@ -382,7 +384,7 @@ pub trait DatadirTimeline: Timeline { total_size += relsize as usize; } } - Ok(total_size * pg_constants::BLCKSZ as usize) + Ok(total_size * BLCKSZ as usize) } /// @@ -912,7 +914,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { result?; if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize); + writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize); self.pending_nblocks = 0; } @@ -940,7 +942,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { writer.finish_write(lsn); if pending_nblocks != 0 { - writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize); + writer.update_current_logical_size(pending_nblocks * BLCKSZ as isize); } Ok(()) @@ -1014,7 +1016,7 @@ struct SlruSegmentDirectory { segments: HashSet, } -static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; pg_constants::BLCKSZ as usize]); +static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // Layout of the Key address space // diff --git a/pageserver/src/reltag.rs b/pageserver/src/reltag.rs index fadd41f547..e3d08f8b3d 100644 --- a/pageserver/src/reltag.rs +++ b/pageserver/src/reltag.rs @@ -2,8 +2,9 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::fmt; -use postgres_ffi::relfile_utils::forknumber_to_name; -use postgres_ffi::{pg_constants, Oid}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::relfile_utils::forknumber_to_name; +use postgres_ffi::Oid; /// /// Relation data file segment id throughout the Postgres cluster. diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index ed5975d3bd..0d35195691 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -3,7 +3,7 @@ // use anyhow::{bail, ensure, Context, Result}; -use postgres_ffi::ControlFileData; + use std::{ fs, path::Path, @@ -69,16 +69,6 @@ pub fn create_repo( ))) } -// Returns checkpoint LSN from controlfile -fn get_lsn_from_controlfile(path: &Path) -> Result { - // Read control file to extract the LSN - let controlfile_path = path.join("global").join("pg_control"); - let controlfile = ControlFileData::decode(&fs::read(controlfile_path)?)?; - let lsn = controlfile.checkPoint; - - Ok(Lsn(lsn)) -} - // Create the cluster temporarily in 'initdbpath' directory inside the repository // to get bootstrap data for timeline initialization. // @@ -128,7 +118,7 @@ fn bootstrap_timeline( run_initdb(conf, &initdb_path)?; let pgdata_path = initdb_path; - let lsn = get_lsn_from_controlfile(&pgdata_path)?.align(); + let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align(); // Import the contents of the data directory at the initial checkpoint // LSN, and any WAL after that. diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index b8064849e0..1b046b9f33 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -22,8 +22,8 @@ //! bespoken Rust code. use anyhow::Context; -use postgres_ffi::nonrelfile_utils::clogpage_precedes; -use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment; +use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; +use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{page_is_new, page_set_lsn}; use anyhow::Result; @@ -33,10 +33,12 @@ use tracing::*; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::walrecord::*; -use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment; -use postgres_ffi::xlog_utils::*; +use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::*; +use postgres_ffi::v14::CheckPoint; use postgres_ffi::TransactionId; -use postgres_ffi::{pg_constants, CheckPoint}; +use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); @@ -293,7 +295,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { // Extract page image from FPI record let img_len = blk.bimg_len as usize; let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); + let mut image = BytesMut::with_capacity(BLCKSZ as usize); image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); if blk.hole_length != 0 { @@ -309,7 +311,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { if !page_is_new(&image) { page_set_lsn(&mut image, lsn) } - assert_eq!(image.len(), pg_constants::BLCKSZ as usize); + assert_eq!(image.len(), BLCKSZ as usize); self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?; } else { let rec = ZenithWalRecord::Postgres { diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 16a1f232e3..0688086117 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -27,7 +27,7 @@ use crate::{ walingest::WalIngest, walrecord::DecodedWALRecord, }; -use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::v14::waldecoder::WalStreamDecoder; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId}; /// Status of the connection. diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 6b01d52005..c56b1c6c0c 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -3,9 +3,10 @@ //! use anyhow::Result; use bytes::{Buf, Bytes}; -use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; -use postgres_ffi::XLogRecord; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; +use postgres_ffi::v14::XLogRecord; +use postgres_ffi::BLCKSZ; use postgres_ffi::{BlockNumber, OffsetNumber}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; use serde::{Deserialize, Serialize}; @@ -618,7 +619,7 @@ pub fn decode_wal_record( blk.hole_length = 0; } } else { - blk.hole_length = pg_constants::BLCKSZ - blk.bimg_len; + blk.hole_length = BLCKSZ - blk.bimg_len; } datatotal += blk.bimg_len as u32; blocks_total_len += blk.bimg_len as u32; @@ -628,9 +629,7 @@ pub fn decode_wal_record( * bimg_len < BLCKSZ if the HAS_HOLE flag is set. */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 - && (blk.hole_offset == 0 - || blk.hole_length == 0 - || blk.bimg_len == pg_constants::BLCKSZ) + && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ) { // TODO /* @@ -667,7 +666,7 @@ pub fn decode_wal_record( * flag is set. */ if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0) - && blk.bimg_len == pg_constants::BLCKSZ + && blk.bimg_len == BLCKSZ { // TODO /* @@ -685,7 +684,7 @@ pub fn decode_wal_record( */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 && blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0 - && blk.bimg_len != pg_constants::BLCKSZ + && blk.bimg_len != BLCKSZ { // TODO /* diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 57817dbc9c..9cf347573a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -44,11 +44,12 @@ use crate::reltag::{RelTag, SlruKind}; use crate::repository::Key; use crate::walrecord::ZenithWalRecord; use metrics::{register_histogram, register_int_counter, Histogram, IntCounter}; -use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift; -use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset; -use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset; -use postgres_ffi::nonrelfile_utils::transaction_id_set_status; -use postgres_ffi::pg_constants; +use postgres_ffi::v14::nonrelfile_utils::{ + mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, + transaction_id_set_status, +}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::BLCKSZ; /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. @@ -417,10 +418,10 @@ impl PostgresRedoManager { } // Append the timestamp - if page.len() == pg_constants::BLCKSZ as usize + 8 { - page.truncate(pg_constants::BLCKSZ as usize); + if page.len() == BLCKSZ as usize + 8 { + page.truncate(BLCKSZ as usize); } - if page.len() == pg_constants::BLCKSZ as usize { + if page.len() == BLCKSZ as usize { page.extend_from_slice(×tamp.to_be_bytes()); } else { warn!( @@ -741,7 +742,7 @@ impl PostgresRedoProcess { // We expect the WAL redo process to respond with an 8k page image. We read it // into this buffer. - let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()]; + let mut resultbuf = vec![0; BLCKSZ.into()]; let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far // Prepare for calling poll() @@ -754,7 +755,7 @@ impl PostgresRedoProcess { // We do three things simultaneously: send the old base image and WAL records to // the child process's stdin, read the result from child's stdout, and forward any logging // information that the child writes to its stderr to the page server's log. - while nresult < pg_constants::BLCKSZ.into() { + while nresult < BLCKSZ.into() { // If we have more data to write, wake up if 'stdin' becomes writeable or // we have data to read. Otherwise only wake up if there's data to read. let nfds = if nwrite < writebuf.len() { 3 } else { 2 }; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index a8121e829e..63bc9bd517 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -9,7 +9,7 @@ use crate::timeline::{Timeline, TimelineTools}; use crate::SafeKeeperConf; use anyhow::{bail, Context, Result}; -use postgres_ffi::xlog_utils::PG_TLI; +use postgres_ffi::PG_TLI; use regex::Regex; use std::str::FromStr; use std::sync::Arc; diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 97fb3654d2..3f84e7b183 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -7,8 +7,7 @@ //! use anyhow::Result; -use bytes::{BufMut, Bytes, BytesMut}; -use crc32c::crc32c_append; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use tracing::*; @@ -19,9 +18,8 @@ use crate::safekeeper::{ }; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; use crate::timeline::TimelineTools; -use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils; -use postgres_ffi::{uint32, uint64, Oid, XLogRecord}; +use postgres_ffi::v14::pg_constants; +use postgres_ffi::v14::xlog_utils; use utils::{ lsn::Lsn, postgres_backend::PostgresBackend, @@ -144,7 +142,7 @@ fn append_logical_message( spg: &mut SafekeeperPostgresHandler, msg: &AppendLogicalMessage, ) -> Result { - let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); + let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message); let sk_state = spg.timeline.get().get_state().1; let begin_lsn = msg.begin_lsn; @@ -182,90 +180,3 @@ fn append_logical_message( append_response, }) } - -#[repr(C)] -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -struct XlLogicalMessage { - db_id: Oid, - transactional: uint32, // bool, takes 4 bytes due to alignment in C structures - prefix_size: uint64, - message_size: uint64, -} - -impl XlLogicalMessage { - pub fn encode(&self) -> Bytes { - use utils::bin_ser::LeSer; - self.ser().unwrap().into() - } -} - -/// Create new WAL record for non-transactional logical message. -/// Used for creating artificial WAL for tests, as LogicalMessage -/// record is basically no-op. -fn encode_logical_message(prefix: &str, message: &str) -> Vec { - let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1); - prefix_bytes.put(prefix.as_bytes()); - prefix_bytes.put_u8(0); - - let message_bytes = message.as_bytes(); - - let logical_message = XlLogicalMessage { - db_id: 0, - transactional: 0, - prefix_size: prefix_bytes.len() as u64, - message_size: message_bytes.len() as u64, - }; - - let mainrdata = logical_message.encode(); - let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); - // only short mainrdata is supported for now - assert!(mainrdata_len <= 255); - let mainrdata_len = mainrdata_len as u8; - - let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; - data.extend_from_slice(&mainrdata); - data.extend_from_slice(&prefix_bytes); - data.extend_from_slice(message_bytes); - - let total_len = xlog_utils::XLOG_SIZE_OF_XLOG_RECORD + data.len(); - - let mut header = XLogRecord { - xl_tot_len: total_len as u32, - xl_xid: 0, - xl_prev: 0, - xl_info: 0, - xl_rmid: 21, - __bindgen_padding_0: [0u8; 2usize], - xl_crc: 0, // crc will be calculated later - }; - - let header_bytes = header.encode().expect("failed to encode header"); - let crc = crc32c_append(0, &data); - let crc = crc32c_append(crc, &header_bytes[0..xlog_utils::XLOG_RECORD_CRC_OFFS]); - header.xl_crc = crc; - - let mut wal: Vec = Vec::new(); - wal.extend_from_slice(&header.encode().expect("failed to encode header")); - wal.extend_from_slice(&data); - - // WAL start position must be aligned at 8 bytes, - // this will add padding for the next WAL record. - const PADDING: usize = 8; - let padding_rem = wal.len() % PADDING; - if padding_rem != 0 { - wal.resize(wal.len() + PADDING - padding_rem, 0); - } - - wal -} - -#[test] -fn test_encode_logical_message() { - let expected = [ - 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38, - 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102, - 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, - ]; - let actual = encode_logical_message("prefix", "message"); - assert_eq!(expected, actual[..]); -} diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index fe4f9d231c..648f0634f8 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -7,7 +7,7 @@ use metrics::{ proto::MetricFamily, Gauge, IntGaugeVec, }; -use postgres_ffi::xlog_utils::XLogSegNo; +use postgres_ffi::v14::xlog_utils::XLogSegNo; use utils::{lsn::Lsn, zid::ZTenantTimelineId}; use crate::{ diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 88747f14e5..22f8ca2de4 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,9 +5,7 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use etcd_broker::subscription_value::SkTimelineInfo; -use postgres_ffi::xlog_utils::TimeLineID; - -use postgres_ffi::xlog_utils::XLogSegNo; +use postgres_ffi::v14::xlog_utils::{TimeLineID, XLogSegNo, MAX_SEND_SIZE}; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -19,7 +17,6 @@ use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; -use postgres_ffi::xlog_utils::MAX_SEND_SIZE; use utils::{ bin_ser::LeSer, lsn::Lsn, diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 4a9c56859f..243d7bf7d0 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -6,7 +6,7 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use crate::wal_storage::WalReader; use anyhow::{bail, Context, Result}; -use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; +use postgres_ffi::v14::xlog_utils::{get_current_timestamp, TimestampTz, MAX_SEND_SIZE}; use bytes::Bytes; use serde::{Deserialize, Serialize}; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 161fca3595..3a10c5d59e 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,8 +4,9 @@ use anyhow::{bail, Context, Result}; use etcd_broker::subscription_value::SkTimelineInfo; + use once_cell::sync::Lazy; -use postgres_ffi::xlog_utils::XLogSegNo; +use postgres_ffi::v14::xlog_utils::XLogSegNo; use serde::Serialize; use tokio::sync::watch; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index b2f9d8d4f3..3552452470 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -11,7 +11,8 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI}; +use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr}; +use postgres_ffi::PG_TLI; use remote_storage::{GenericRemoteStorage, RemoteStorage}; use tokio::fs::File; use tokio::runtime::Builder; diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 5f4bf588c7..6a45ae1411 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -13,9 +13,10 @@ use std::pin::Pin; use tokio::io::AsyncRead; use once_cell::sync::Lazy; -use postgres_ffi::xlog_utils::{ - find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI, +use postgres_ffi::v14::xlog_utils::{ + find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, }; +use postgres_ffi::PG_TLI; use std::cmp::min; use std::fs::{self, remove_file, File, OpenOptions}; @@ -30,9 +31,10 @@ use crate::safekeeper::SafeKeeperState; use crate::wal_backup::read_object; use crate::SafeKeeperConf; -use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; +use postgres_ffi::v14::xlog_utils::XLogFileName; +use postgres_ffi::XLOG_BLCKSZ; -use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::v14::waldecoder::WalStreamDecoder; use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};