diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index a7d14decad..79e69caba5 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -17,8 +17,7 @@ use std::time::SystemTime; use tar::{Builder, Header}; use walkdir::WalkDir; -use crate::repository::{DatabaseTag, ObjectTag, SlruBufferTag, Timeline}; -use postgres_ffi::nonrelfile_utils::*; +use crate::repository::{DatabaseTag, ObjectTag, Timeline}; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; use postgres_ffi::*; @@ -188,21 +187,14 @@ impl<'a> Basebackup<'a> { Ok(()) } - // Check transaction status - fn get_tx_status(&self, xid: TransactionId) -> anyhow::Result { - let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE; - let tag = ObjectTag::Clog(SlruBufferTag { blknum }); - let clog_page = self.timeline.get_page_at_lsn(tag, self.lsn)?; - let status = transaction_id_get_status(xid, &clog_page[..]); - Ok(status) - } - // // Extract twophase state files // fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> { // Include in tarball two-phase files only of in-progress transactions - if self.get_tx_status(xid)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { + if self.timeline.get_tx_status(xid, self.lsn)? + == pg_constants::TRANSACTION_STATUS_IN_PROGRESS + { let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; let mut buf = BytesMut::new(); buf.extend_from_slice(&img[..]); @@ -225,8 +217,8 @@ impl<'a> Basebackup<'a> { let pg_control_bytes = self .timeline .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?; - let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; - let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; + let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; + let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; // Here starts pg_resetwal inspired magic // Generate new pg_control and WAL needed for bootstrap @@ -234,7 +226,7 @@ impl<'a> Basebackup<'a> { let new_lsn = XLogSegNoOffsetToRecPtr( new_segno, - SizeOfXLogLongPHD as u32, + XLOG_SIZE_OF_XLOG_LONG_PHD as u32, pg_constants::WAL_SEGMENT_SIZE, ); checkpoint.redo = new_lsn; @@ -247,7 +239,7 @@ impl<'a> Basebackup<'a> { pg_control.checkPointCopy = checkpoint; //send pg_control - let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control); + let pg_control_bytes = pg_control.encode(); let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; self.ar.append(&header, &pg_control_bytes[..])?; diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index b8a6751fc5..32bfcea895 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -21,8 +21,8 @@ use crate::{PageServerConf, ZTimelineId}; use anyhow::{bail, Context, Result}; use bytes::Bytes; use log::*; +use postgres_ffi::pg_constants; use serde::{Deserialize, Serialize}; -use std::cmp::max; use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryInto; use std::sync::{Arc, Mutex, RwLock}; @@ -725,62 +725,160 @@ impl ObjectTimeline { // WAL is large enough to perform GC let now = Instant::now(); let mut truncated = 0u64; + let mut inspected = 0u64; let mut deleted = 0u64; - // Iterate through all relations - for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? { - let mut last_version = true; - let mut key = relation_size_key(self.timelineid, *rels); - let mut max_size = 0u32; - let mut relation_dropped = false; - - // Process relation metadata versions - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - let rel_meta = RelationSizeEntry::des(&vers.1)?; - // If relation is dropped at the horizon, - // we can remove all its versions including last (Unlink) - match rel_meta { - RelationSizeEntry::Size(size) => max_size = max(max_size, size), - RelationSizeEntry::Unlink => { + // Iterate through all objects in timeline + for obj in self + .obj_store + .list_objects(self.timelineid, false, last_lsn)? + { + inspected += 1; + match obj { + // Prepared transactions + ObjectTag::TwoPhase(prepare) => { + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + if self.get_tx_status(prepare.xid, horizon)? + != pg_constants::TRANSACTION_STATUS_IN_PROGRESS + { + let lsn = vers.0; + self.obj_store.unlink(&key, lsn)?; + deleted += 1; + } + } + } + ObjectTag::RelationMetadata(_) => { + // Do not need to reconstruct page images, + // just delete all old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; if last_version { - relation_dropped = true; - info!("Relation {:?} dropped", rels); + let content = vers.1; + match ObjectValue::des(&content[..])? { + ObjectValue::Unlink => { + self.obj_store.unlink(&key, lsn)?; + deleted += 1; + } + _ => (), // preserve last version + } + last_version = false; + truncated += 1; + } else { + self.obj_store.unlink(&key, lsn)?; + deleted += 1; } } } - if last_version { - last_version = false; - if !relation_dropped { - // preserve last version - continue; + ObjectTag::RelationBuffer(tag) => { + // Reconstruct last page + self.get_page_at_lsn_nowait(obj, last_lsn)?; + + // Reconstruct page at horizon unless relation was dropped + // and delete all older versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + truncated += 1; + last_version = false; + if let Some(rel_size) = self.relsize_get_nowait(tag.rel, lsn)? { + if rel_size > tag.blknum { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(obj, lsn)?; + continue; + } + debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size); + } else { + if let Some(rel_size) = + self.relsize_get_nowait(tag.rel, last_lsn)? + { + debug!("Preserve block {} of relation {:?} at {} because relation has size {} at {}", tag.rel, tag, lsn, rel_size, last_lsn); + continue; + } + debug!("Relation {:?} was dropped at {}", tag.rel, lsn); + } + // relation was dropped or truncated so this block can be removed + } + self.obj_store.unlink(&key, lsn)?; + deleted += 1; } } - self.obj_store.unlink(&key, lsn)?; - deleted += 1; - } - // Now process all relation blocks - for blknum in 0..max_size { - key.buf_tag.blknum = blknum; - last_version = true; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - last_version = false; - truncated += 1; - if !relation_dropped { - // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(key.buf_tag, lsn)?; - continue; + // SLRU-s + ObjectTag::Clog(_) + | ObjectTag::MultiXactOffsets(_) + | ObjectTag::MultiXactMembers(_) => { + // Materialize last version + self.get_page_at_lsn_nowait(obj, last_lsn)?; + + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + let content = vers.1; + match ObjectValue::des(&content[..])? { + ObjectValue::Unlink => { + self.obj_store.unlink(&key, lsn)?; + deleted += 1; + } + ObjectValue::WALRecord(_) => { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(obj, lsn)?; + } + _ => {} // do nothing if already materialized + } + last_version = false; + truncated += 1; + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + deleted += 1; } } - self.obj_store.unlink(&key, lsn)?; - deleted += 1; } + // versioned alwaysmaterialized objects: no need to reconstruct pages + ObjectTag::Checkpoint | ObjectTag::ControlFile => { + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + // presrve last version + last_version = false; + truncated += 1; + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + deleted += 1; + } + } + } + _ => (), // do nothing } } - info!("Garbage collection completed in {:?}: {} version histories truncated, {} versions deleted", - now.elapsed(), truncated, deleted); + info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} version histories truncated, {} versions deleted", + now.elapsed(), inspected, truncated, deleted); } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0fb5878761..59b1606b06 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -229,7 +229,7 @@ impl PageServerHandler { PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) } PagestreamFeMessage::Read(req) => { - let buf_tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: RelTag { spcnode: req.spcnode, dbnode: req.dbnode, @@ -237,9 +237,9 @@ impl PageServerHandler { forknum: req.forknum, }, blknum: req.blkno, - }; + }); - let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) { + let read_response = match timeline.get_page_at_lsn(tag, req.lsn) { Ok(p) => PagestreamReadResponse { ok: true, n_blocks: 0, @@ -291,13 +291,17 @@ impl PageServerHandler { let snapshot_lsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); let req_lsn = lsn.unwrap_or(snapshot_lsn); - basebackup::send_tarball_at_lsn( - &mut CopyDataSink { pgb }, - timelineid, - &timeline, - req_lsn, - snapshot_lsn, - )?; + { + let mut writer = CopyDataSink { pgb }; + let mut basebackup = basebackup::Basebackup::new( + &mut writer, + timelineid, + &timeline, + req_lsn, + snapshot_lsn, + ); + basebackup.send_tarball()?; + } pgb.write_message(&BeMessage::CopyDone)?; debug!("CopyDone sent!"); @@ -507,156 +511,6 @@ impl postgres_backend::Handler for PageServerHandler { pgb.flush()?; Ok(()) } - - fn handle_controlfile(&mut self) -> io::Result<()> { - self.write_message_noflush(&BeMessage::RowDescription)?; - self.write_message_noflush(&BeMessage::ControlFile)?; - self.write_message(&BeMessage::CommandComplete)?; - - Ok(()) - } - - fn handle_pagerequests(&mut self, timelineid: ZTimelineId) -> anyhow::Result<()> { - // Check that the timeline exists - let repository = page_cache::get_repository(); - let timeline = repository.get_timeline(timelineid).map_err(|_| { - anyhow!( - "client requested pagestream on timeline {} which does not exist in page server", - timelineid - ) - })?; - - /* switch client to COPYBOTH */ - self.stream.write_u8(b'W')?; - self.stream.write_i32::(4 + 1 + 2)?; - self.stream.write_u8(0)?; /* copy_is_binary */ - self.stream.write_i16::(0)?; /* numAttributes */ - self.stream.flush()?; - - while let Some(message) = self.read_message()? { - trace!("query({:?}): {:?}", timelineid, message); - - let copy_data_bytes = match message { - FeMessage::CopyData(bytes) => bytes, - _ => continue, - }; - - let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; - - let response = match zenith_fe_msg { - PagestreamFeMessage::Exists(req) => { - let tag = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false); - - PagestreamBeMessage::Status(PagestreamStatusResponse { - ok: exist, - n_blocks: 0, - }) - } - PagestreamFeMessage::Nblocks(req) => { - let tag = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0); - - PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) - } - PagestreamFeMessage::Read(req) => { - let buf_tag = ObjectTag::RelationBuffer(BufferTag { - rel: RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }, - blknum: req.blkno, - }); - - let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) { - Ok(p) => PagestreamReadResponse { - ok: true, - n_blocks: 0, - page: p, - }, - Err(e) => { - const ZERO_PAGE: [u8; 8192] = [0; 8192]; - error!("get_page_at_lsn: {}", e); - PagestreamReadResponse { - ok: false, - n_blocks: 0, - page: Bytes::from_static(&ZERO_PAGE), - } - } - }; - - PagestreamBeMessage::Read(read_response) - } - }; - - self.write_message(&BeMessage::CopyData(response.serialize()))?; - } - - Ok(()) - } - - fn handle_basebackup_request( - &mut self, - timelineid: ZTimelineId, - lsn: Option, - ) -> anyhow::Result<()> { - // check that the timeline exists - let repository = page_cache::get_repository(); - let timeline = repository.get_timeline(timelineid).map_err(|e| { - error!("error fetching timeline: {:?}", e); - anyhow!( - "client requested basebackup on timeline {} which does not exist in page server", - timelineid - ) - })?; - /* switch client to COPYOUT */ - let stream = &mut self.stream; - stream.write_u8(b'H')?; - stream.write_i32::(4 + 1 + 2)?; - stream.write_u8(0)?; /* copy_is_binary */ - stream.write_i16::(0)?; /* numAttributes */ - stream.flush()?; - info!("sent CopyOut"); - - /* Send a tarball of the latest snapshot on the timeline */ - - // find latest snapshot - let snapshot_lsn = - restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); - let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); - { - let mut writer = CopyDataSink { stream }; - let mut basebackup = basebackup::Basebackup::new( - &mut writer, - timelineid, - &timeline, - req_lsn, - snapshot_lsn, - ); - basebackup.send_tarball()?; - } - // CopyDone - self.stream.write_u8(b'c')?; - self.stream.write_u32::(4)?; - self.stream.flush()?; - debug!("CopyDone sent!"); - - Ok(()) - } } /// diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 0850dc6065..256af4d1c1 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -2,6 +2,8 @@ use crate::waldecoder::TransactionId; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use postgres_ffi::nonrelfile_utils::transaction_id_get_status; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::forknumber_to_name; use serde::{Deserialize, Serialize}; use std::collections::HashSet; @@ -106,6 +108,15 @@ pub trait Timeline: Send + Sync { /// Relation size is increased implicitly and decreased with Truncate updates. // TODO ordering guarantee? fn history<'a>(&'a self) -> Result>; + + // Check transaction status + fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result { + let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let tag = ObjectTag::Clog(SlruBufferTag { blknum }); + let clog_page = self.get_page_at_lsn(tag, lsn)?; + let status = transaction_id_get_status(xid, &clog_page[..]); + Ok(status) + } } pub trait History: Iterator> { diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index b50e316431..aa2adf969d 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -288,11 +288,11 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let mut last_lsn = startpoint; let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; - let mut checkpoint = decode_checkpoint(checkpoint_bytes)?; + let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; if checkpoint.nextXid.value == 0 { let pg_control_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?; - let pg_control = decode_pg_control(pg_control_bytes)?; + let pg_control = ControlFileData::decode(&pg_control_bytes)?; checkpoint = pg_control.checkPointCopy; } @@ -358,7 +358,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: offset = 0; } info!("reached end of WAL at {}", last_lsn); - let checkpoint_bytes = encode_checkpoint(checkpoint); + let checkpoint_bytes = checkpoint.encode(); timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes)?; Ok(()) } diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index 39055ac4b1..2409313fa4 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -175,16 +175,16 @@ impl ObjectStore for RocksObjectStore { let key = StorageKey::des(iter.key().unwrap())?; if let ObjectTag::RelationBuffer(buf_tag) = key.obj_key.tag { if (spcnode != 0 && buf_tag.rel.spcnode != spcnode) - || (dbnode != 0 && buf_tag.rel.dbnode != dbnode) - { + || (dbnode != 0 && buf_tag.rel.dbnode != dbnode) + { break; } if key.lsn < lsn { rels.insert(buf_tag.rel); } - let mut next_tag = buf_tag.clone(); - next_tag.rel.relnode += 1; // skip to next relation - search_key = ObjectTag::RelationBuffernext_tag); + let mut next_tag = buf_tag.clone(); + next_tag.rel.relnode += 1; // skip to next relation + search_key.obj_key.tag = ObjectTag::RelationBuffer(next_tag); } else { break; } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 96b355f3c0..52dd2a0d20 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -7,6 +7,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; +use postgres_ffi::CheckPoint; use postgres_ffi::XLogLongPageHeaderData; use postgres_ffi::XLogPageHeaderData; use postgres_ffi::XLogRecord; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index eb0a7e45a0..7aa4c3fa97 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -176,7 +176,7 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; - let mut checkpoint = decode_checkpoint(checkpoint_bytes)?; + let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); while let Some(replication_message) = physical_stream.next()? { @@ -196,12 +196,12 @@ fn walreceiver_main( waldecoder.feed_bytes(data); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let old_checkpoint_bytes = encode_checkpoint(checkpoint); + let old_checkpoint_bytes = checkpoint.encode(); let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); restore_local_repo::save_decoded_record(&*timeline, &decoded, recdata, lsn)?; last_rec_lsn = lsn; - let new_checkpoint_bytes = encode_checkpoint(checkpoint); + let new_checkpoint_bytes = checkpoint.encode(); if new_checkpoint_bytes != old_checkpoint_bytes { timeline.put_page_image( ObjectTag::Checkpoint, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c972813191..947c4851d6 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -42,7 +42,7 @@ use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils::XLogRecord; +use postgres_ffi::XLogRecord; /// /// WAL Redo Manager is responsible for replaying WAL records. diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index 970f4078e9..e7553721b2 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -4,7 +4,7 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs")); pub mod controlfile_utils; -pub mod pg_constants; pub mod nonrelfile_utils; +pub mod pg_constants; pub mod relfile_utils; pub mod xlog_utils; diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index ea8c4ae660..dd58f2ef25 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -7,9 +7,9 @@ // have been named the same as the corresponding PostgreSQL functions instead. // -use crate::encode_checkpoint; use crate::pg_constants; use crate::CheckPoint; +use crate::ControlFileData; use crate::FullTransactionId; use crate::XLogLongPageHeaderData; use crate::XLogPageHeaderData; @@ -37,6 +37,7 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::(); pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::(); pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::(); +pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2; pub type XLogRecPtr = u64; pub type TimeLineID = u32; @@ -398,7 +399,7 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { xlp_magic: XLOG_PAGE_MAGIC as u16, xlp_info: pg_constants::XLP_LONG_HEADER, xlp_tli: 1, // FIXME: always use Postgres timeline 1 - xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64, + xlp_pageaddr: pg_control.checkPointCopy.redo - XLOG_SIZE_OF_XLOG_LONG_PHD as u64, xlp_rem_len: 0, } }, @@ -407,7 +408,7 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { xlp_xlog_blcksz: XLOG_BLCKSZ as u32, }; - let hdr_bytes = encode_xlog_long_phd(hdr); + let hdr_bytes = hdr.encode(); seg_buf.extend_from_slice(&hdr_bytes); let rec_hdr = XLogRecord { @@ -425,8 +426,8 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT); rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8); - let rec_bytes = encode_xlog_record(rec_hdr); - let checkpoint_bytes = encode_checkpoint(pg_control.checkPointCopy); + let rec_bytes = rec_hdr.encode(); + let checkpoint_bytes = pg_control.checkPointCopy.encode(); //calculate record checksum let mut crc = 0; diff --git a/postgres_ffi/xlog_ffi.h b/postgres_ffi/xlog_ffi.h index 68fe9b1bdb..e6db154696 100644 --- a/postgres_ffi/xlog_ffi.h +++ b/postgres_ffi/xlog_ffi.h @@ -1,2 +1,3 @@ #include "c.h" #include "access/xlog_internal.h" +#include "access/xlogrecord.h"