apply Lsn type everywhere

Use the `Lsn` type everywhere that I can find u64 being used to
represent an LSN.
This commit is contained in:
Eric Seppanen
2021-04-23 13:58:03 -07:00
parent f62ce4bcf7
commit 01e239afa3
6 changed files with 122 additions and 144 deletions

View File

@@ -47,6 +47,7 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{convert::TryInto, ops::AddAssign};
use zenith_utils::lsn::Lsn;
use zenith_utils::seqwait::SeqWait;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
@@ -62,7 +63,7 @@ pub struct PageCache {
walredo_mgr: WalRedoManager,
// Allows .await on the arrival of a particular LSN.
seqwait_lsn: SeqWait,
seqwait_lsn: SeqWait<Lsn>,
// Counters, for metrics collection.
pub num_entries: AtomicU64,
@@ -120,9 +121,9 @@ struct PageCacheShared {
// walreceiver.rs instead of here, but it seems convenient to keep all three
// values together.
//
first_valid_lsn: u64,
last_valid_lsn: u64,
last_record_lsn: u64,
first_valid_lsn: Lsn,
last_valid_lsn: Lsn,
last_record_lsn: Lsn,
}
lazy_static! {
@@ -204,16 +205,16 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
PageCache {
shared: Mutex::new(PageCacheShared {
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
first_valid_lsn: Lsn(0),
last_valid_lsn: Lsn(0),
last_record_lsn: Lsn(0),
}),
db: open_rocksdb(&conf, timelineid),
walredo_mgr: WalRedoManager::new(conf, timelineid),
seqwait_lsn: SeqWait::new(0),
seqwait_lsn: SeqWait::new(Lsn(0)),
num_entries: AtomicU64::new(0),
num_page_images: AtomicU64::new(0),
@@ -242,18 +243,18 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64,
pub lsn: Lsn,
}
impl CacheKey {
pub fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn);
buf.put_u64(self.lsn.0);
}
pub fn unpack(buf: &mut BytesMut) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: buf.get_u64(),
lsn: Lsn::from(buf.get_u64()),
}
}
}
@@ -343,7 +344,7 @@ impl BufferTag {
#[derive(Debug, Clone)]
pub struct WALRecord {
pub lsn: u64, // LSN at the *end* of the record
pub lsn: Lsn, // LSN at the *end* of the record
pub will_init: bool,
pub truncate: bool,
pub rec: Bytes,
@@ -355,7 +356,7 @@ pub struct WALRecord {
impl WALRecord {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u64(self.lsn);
buf.put_u64(self.lsn.0);
buf.put_u8(self.will_init as u8);
buf.put_u8(self.truncate as u8);
buf.put_u32(self.main_data_offset);
@@ -363,7 +364,7 @@ impl WALRecord {
buf.put_slice(&self.rec[..]);
}
pub fn unpack(buf: &mut BytesMut) -> WALRecord {
let lsn = buf.get_u64();
let lsn = Lsn::from(buf.get_u64());
let will_init = buf.get_u8() != 0;
let truncate = buf.get_u8() != 0;
let main_data_offset = buf.get_u32();
@@ -387,7 +388,7 @@ impl PageCache {
///
/// Returns an 8k page image
///
pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let lsn = self.wait_lsn(req_lsn).await?;
@@ -448,7 +449,7 @@ impl PageCache {
///
/// Get size of relation at given LSN.
///
pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
pub async fn relsize_get(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result<u32> {
self.wait_lsn(lsn).await?;
return self.relsize_get_nowait(rel, lsn);
}
@@ -456,7 +457,7 @@ impl PageCache {
///
/// Does relation exist at given LSN?
///
pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result<bool> {
pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: Lsn) -> anyhow::Result<bool> {
let lsn = self.wait_lsn(req_lsn).await?;
let key = CacheKey {
@@ -497,7 +498,7 @@ impl PageCache {
pub fn collect_records_for_apply(
&self,
tag: BufferTag,
lsn: u64,
lsn: Lsn,
) -> (Option<Bytes>, Vec<WALRecord>) {
let mut buf = BytesMut::new();
let key = CacheKey { tag, lsn };
@@ -576,7 +577,7 @@ impl PageCache {
let mut key = CacheKey { tag, lsn: rec.lsn };
// What was the size of the relation before this record?
let last_lsn = self.last_valid_lsn.load(Ordering::Acquire);
let last_lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire));
let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?;
let content = CacheEntryContent {
@@ -606,7 +607,7 @@ impl PageCache {
///
/// Memorize a full image of a page version
///
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) {
pub fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: Some(img),
@@ -629,7 +630,7 @@ impl PageCache {
pub fn create_database(
&self,
lsn: u64,
lsn: Lsn,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
@@ -646,7 +647,7 @@ impl PageCache {
},
blknum: 0,
},
lsn: 0,
lsn: Lsn(0),
};
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
@@ -679,22 +680,19 @@ impl PageCache {
}
/// Remember that WAL has been received and added to the page cache up to the given LSN
pub fn advance_last_valid_lsn(&self, lsn: u64) {
pub fn advance_last_valid_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.seqwait_lsn.advance(lsn);
} else {
warn!(
"attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})",
oldlsn >> 32,
oldlsn & 0xffffffff,
lsn >> 32,
lsn & 0xffffffff
"attempted to move last valid LSN backwards (was {}, new {})",
oldlsn, lsn
);
}
}
@@ -704,7 +702,7 @@ impl PageCache {
///
/// NOTE: this updates last_valid_lsn as well.
///
pub fn advance_last_record_lsn(&self, lsn: u64) {
pub fn advance_last_record_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
@@ -713,8 +711,8 @@ impl PageCache {
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.last_record_lsn.store(lsn.0, Ordering::Relaxed);
self.seqwait_lsn.advance(lsn);
}
@@ -723,7 +721,7 @@ impl PageCache {
///
/// TODO: This should be called by garbage collection, so that if an older
/// page is requested, we will return an error to the requestor.
pub fn _advance_first_valid_lsn(&self, lsn: u64) {
pub fn _advance_first_valid_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
@@ -731,29 +729,29 @@ impl PageCache {
// Can't overtake last_valid_lsn (except when we're
// initializing the system and last_valid_lsn hasn't been set yet.
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.first_valid_lsn.store(lsn.0, Ordering::Relaxed);
}
pub fn init_valid_lsn(&self, lsn: u64) {
pub fn init_valid_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0);
assert!(shared.last_record_lsn == 0);
assert!(shared.first_valid_lsn == Lsn(0));
assert!(shared.last_valid_lsn == Lsn(0));
assert!(shared.last_record_lsn == Lsn(0));
shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
self.first_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.last_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.last_record_lsn.store(lsn.0, Ordering::Relaxed);
}
pub fn get_last_valid_lsn(&self) -> u64 {
pub fn get_last_valid_lsn(&self) -> Lsn {
let shared = self.shared.lock().unwrap();
shared.last_record_lsn
@@ -781,8 +779,8 @@ impl PageCache {
//
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire));
fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result<u32> {
assert!(lsn.0 <= self.last_valid_lsn.load(Ordering::Acquire));
let mut key = CacheKey {
tag: BufferTag {
@@ -833,7 +831,7 @@ impl PageCache {
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
if last_lsn > conf.gc_horizon {
if last_lsn.0 > conf.gc_horizon {
let horizon = last_lsn - conf.gc_horizon;
let mut maxkey = CacheKey {
tag: BufferTag {
@@ -845,7 +843,7 @@ impl PageCache {
},
blknum: u32::MAX,
},
lsn: u64::MAX,
lsn: Lsn::MAX,
};
let now = Instant::now();
let mut reconstructed = 0u64;
@@ -873,7 +871,7 @@ impl PageCache {
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = 0; // first version
minkey.lsn = Lsn(0); // first version
// reconstruct most recent page version
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
@@ -942,12 +940,12 @@ impl PageCache {
//
// Wait until WAL has been received up to the given LSN.
//
async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result<u64> {
async fn wait_lsn(&self, req_lsn: Lsn) -> anyhow::Result<Lsn> {
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
//This is necessary for bootstrap.
if lsn == 0 {
lsn = self.last_valid_lsn.load(Ordering::Acquire);
if lsn == Lsn(0) {
lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire));
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
self.last_valid_lsn.load(Ordering::Acquire),
@@ -960,9 +958,8 @@ impl PageCache {
.await
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;

View File

@@ -25,6 +25,7 @@ use tokio::runtime;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task;
use zenith_utils::lsn::Lsn;
use crate::basebackup;
use crate::page_cache;
@@ -84,7 +85,7 @@ struct ZenithRequest {
relnode: u32,
forknum: u8,
blkno: u32,
lsn: u64,
lsn: Lsn,
}
#[derive(Debug)]
@@ -373,7 +374,7 @@ impl FeMessage {
relnode: body.get_u32(),
forknum: body.get_u8(),
blkno: body.get_u32(),
lsn: body.get_u64(),
lsn: Lsn::from(body.get_u64()),
};
// TODO: consider using protobuf or serde bincode for less error prone

View File

@@ -34,6 +34,7 @@ use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::xlog_utils::*;
use zenith_utils::lsn::Lsn;
// From pg_tablespace_d.h
//
@@ -60,20 +61,21 @@ pub fn restore_timeline(
.join(timeline.to_string())
.join("snapshots");
let mut last_snapshot_lsn: u64 = 0;
let mut last_snapshot_lsn: Lsn = Lsn(0);
for direntry in fs::read_dir(&snapshotspath).unwrap() {
let direntry = direntry?;
let filename = direntry.file_name().to_str().unwrap().to_owned();
let lsn = u64::from_str_radix(&filename, 16)?;
let filename = direntry.file_name();
let lsn = Lsn::from_filename(&filename)?;
last_snapshot_lsn = max(lsn, last_snapshot_lsn);
restore_snapshot(conf, pcache, timeline, &filename)?;
info!("restored snapshot at {}", filename);
// FIXME: pass filename as Path instead of str?
let filename_str = filename.into_string().unwrap();
restore_snapshot(conf, pcache, timeline, &filename_str)?;
info!("restored snapshot at {:?}", filename_str);
}
if last_snapshot_lsn == 0 {
if last_snapshot_lsn == Lsn(0) {
error!(
"could not find valid snapshot in {}",
snapshotspath.display()
@@ -183,7 +185,7 @@ fn restore_relfile(
dboid: u32,
path: &Path,
) -> Result<()> {
let lsn = u64::from_str_radix(snapshot, 16)?;
let lsn = Lsn::from_hex(snapshot)?;
// Does it look like a relation file?
@@ -245,15 +247,16 @@ fn restore_wal(
_conf: &PageServerConf,
pcache: &PageCache,
timeline: ZTimelineId,
startpoint: u64,
startpoint: Lsn,
) -> Result<()> {
let walpath = format!("timelines/{}/wal", timeline);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024);
let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024);
let mut last_lsn = 0;
const SEG_SIZE: u64 = 16 * 1024 * 1024;
let mut segno = startpoint.segment_number(SEG_SIZE);
let mut offset = startpoint.segment_offset(SEG_SIZE);
let mut last_lsn = Lsn(0);
loop {
// FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, 16 * 1024 * 1024);
@@ -336,11 +339,7 @@ fn restore_wal(
segno += 1;
offset = 0;
}
info!(
"reached end of WAL at {:X}/{:X}",
last_lsn >> 32,
last_lsn & 0xffffffff
);
info!("reached end of WAL at {}", last_lsn);
Ok(())
}

View File

@@ -4,6 +4,7 @@ use log::*;
use std::cmp::min;
use std::str;
use thiserror::Error;
use zenith_utils::lsn::Lsn;
const XLOG_BLCKSZ: u32 = 8192;
@@ -41,9 +42,9 @@ const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4;
#[allow(dead_code)]
pub struct WalStreamDecoder {
lsn: u64,
lsn: Lsn,
startlsn: u64, // LSN where this record starts
startlsn: Lsn, // LSN where this record starts
contlen: u32,
padlen: u32,
@@ -56,7 +57,7 @@ pub struct WalStreamDecoder {
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
msg: String,
lsn: u64,
lsn: Lsn,
}
//
@@ -64,11 +65,11 @@ pub struct WalDecodeError {
// FIXME: This isn't a proper rust stream
//
impl WalStreamDecoder {
pub fn new(lsn: u64) -> WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
startlsn: 0,
startlsn: Lsn(0),
contlen: 0,
padlen: 0,
@@ -89,10 +90,10 @@ impl WalStreamDecoder {
/// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
/// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid.
///
pub fn poll_decode(&mut self) -> Result<Option<(u64, Bytes)>, WalDecodeError> {
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
loop {
// parse and verify page boundaries as we go
if self.lsn % WAL_SEGMENT_SIZE == 0 {
if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
// parse long header
if self.inputbuf.remaining() < SizeOfXLogLongPHD {
@@ -100,7 +101,7 @@ impl WalStreamDecoder {
}
let hdr = self.decode_XLogLongPageHeaderData();
if hdr.std.xlp_pageaddr != self.lsn {
if hdr.std.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog segment header".into(),
lsn: self.lsn,
@@ -110,7 +111,8 @@ impl WalStreamDecoder {
self.lsn += SizeOfXLogLongPHD as u64;
continue;
} else if self.lsn % (XLOG_BLCKSZ as u64) == 0 {
} else if self.lsn.0 % (XLOG_BLCKSZ as u64) == 0 {
// FIXME: make this a member of Lsn, but what should it be called?
// parse page header
if self.inputbuf.remaining() < SizeOfXLogShortPHD {
@@ -118,7 +120,7 @@ impl WalStreamDecoder {
}
let hdr = self.decode_XLogPageHeaderData();
if hdr.xlp_pageaddr != self.lsn {
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog page header".into(),
lsn: self.lsn,
@@ -163,7 +165,8 @@ impl WalStreamDecoder {
continue;
} else {
// we're continuing a record, possibly from previous page.
let pageleft: u32 = XLOG_BLCKSZ - (self.lsn % (XLOG_BLCKSZ as u64)) as u32;
// FIXME: Should any of this math be captured into Lsn or a related type?
let pageleft: u32 = XLOG_BLCKSZ - (self.lsn.0 % (XLOG_BLCKSZ as u64)) as u32;
// read the rest of the record, or as much as fits on this page.
let n = min(self.contlen, pageleft) as usize;
@@ -184,16 +187,13 @@ impl WalStreamDecoder {
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
if is_xlog_switch_record(&recordbuf) {
trace!(
"saw xlog switch record at {:X}/{:X}",
(self.lsn >> 32),
self.lsn & 0xffffffff
);
self.padlen = (WAL_SEGMENT_SIZE - (self.lsn % WAL_SEGMENT_SIZE)) as u32;
trace!("saw xlog switch record at {}", self.lsn);
self.padlen = (WAL_SEGMENT_SIZE - (self.lsn.0 % WAL_SEGMENT_SIZE)) as u32;
}
if self.lsn % 8 != 0 {
self.padlen = 8 - (self.lsn % 8) as u32;
// FIXME: what does this code do?
if self.lsn.0 % 8 != 0 {
self.padlen = 8 - (self.lsn.0 % 8) as u32;
}
let result = (self.lsn, recordbuf);

View File

@@ -31,6 +31,7 @@ use tokio::time::{sleep, Duration};
use tokio_postgres::replication::{PgTimestamp, ReplicationStream};
use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_stream::StreamExt;
use zenith_utils::lsn::Lsn;
//
// We keep one WAL Receiver active per timeline.
@@ -138,7 +139,7 @@ async fn walreceiver_main(
let identify = identify_system(&rclient).await?;
info!("{:?}", identify);
let end_of_wal = u64::from(identify.xlogpos);
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
let pcache = page_cache::get_pagecache(&conf, timelineid).unwrap();
@@ -148,7 +149,7 @@ async fn walreceiver_main(
//
let mut startpoint = pcache.get_last_valid_lsn();
let last_valid_lsn = pcache.get_last_valid_lsn();
if startpoint == 0 {
if startpoint == Lsn(0) {
// If we start here with identify.xlogpos we will have race condition with
// postgres start: insert into postgres may request page that was modified with lsn
// smaller than identify.xlogpos.
@@ -157,37 +158,31 @@ async fn walreceiver_main(
// different like having 'initdb' method on a pageserver (or importing some shared
// empty database snapshot), so for now I just put start of first segment which
// seems to be a valid record.
pcache.init_valid_lsn(0x_1_000_000_u64);
startpoint = 0x_1_000_000_u64;
pcache.init_valid_lsn(Lsn(0x_1_000_000));
startpoint = Lsn(0x_1_000_000);
} else {
// There might be some padding after the last full record, skip it.
//
// FIXME: It probably would be better to always start streaming from the beginning
// of the page, or the segment, so that we could check the page/segment headers
// too. Just for the sake of paranoia.
if startpoint % 8 != 0 {
startpoint += 8 - (startpoint % 8);
// FIXME: should any of this logic move inside the Lsn type?
if startpoint.0 % 8 != 0 {
startpoint += 8 - (startpoint.0 % 8);
}
}
debug!(
"last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...",
(last_valid_lsn >> 32),
(last_valid_lsn & 0xffffffff),
(startpoint >> 32),
(startpoint & 0xffffffff),
timelineid,
(end_of_wal >> 32),
(end_of_wal & 0xffffffff)
"last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...",
last_valid_lsn, startpoint, timelineid, end_of_wal
);
let startpoint = PgLsn::from(startpoint);
let query = format!("START_REPLICATION PHYSICAL {}", startpoint);
let copy_stream = rclient.copy_both_simple::<bytes::Bytes>(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
tokio::pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
let mut waldecoder = WalStreamDecoder::new(startpoint);
while let Some(replication_message) = physical_stream.next().await {
match replication_message? {
@@ -195,7 +190,7 @@ async fn walreceiver_main(
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
let startlsn = xlog_data.wal_start();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
write_wal_file(
@@ -205,13 +200,7 @@ async fn walreceiver_main(
data,
)?;
trace!(
"received XLogData between {:X}/{:X} and {:X}/{:X}",
(startlsn >> 32),
(startlsn & 0xffffffff),
(endlsn >> 32),
(endlsn & 0xffffffff)
);
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);
@@ -298,11 +287,7 @@ async fn walreceiver_main(
pcache.advance_last_valid_lsn(endlsn);
if !caught_up && endlsn >= end_of_wal {
info!(
"caught up at LSN {:X}/{:X}",
(endlsn >> 32),
(endlsn & 0xffffffff)
);
info!("caught up at LSN {}", endlsn);
caught_up = true;
}
}
@@ -320,7 +305,7 @@ async fn walreceiver_main(
);
if reply_requested {
// TODO: More thought should go into what values are sent here.
let last_lsn = PgLsn::from(pcache.get_last_valid_lsn());
let last_lsn = PgLsn::from(u64::from(pcache.get_last_valid_lsn()));
let write_lsn = last_lsn;
let flush_lsn = last_lsn;
let apply_lsn = PgLsn::INVALID;
@@ -387,7 +372,7 @@ pub async fn identify_system(client: &tokio_postgres::Client) -> Result<Identify
}
fn write_wal_file(
startpos: XLogRecPtr,
startpos: Lsn,
timeline: ZTimelineId,
wal_seg_size: usize,
buf: &[u8],
@@ -401,7 +386,7 @@ fn write_wal_file(
let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline));
/* Extract WAL location for this block */
let mut xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize;
let mut xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize;
while bytes_left != 0 {
let bytes_to_write;
@@ -417,7 +402,7 @@ fn write_wal_file(
}
/* Open file */
let segno = XLByteToSeg(start_pos, wal_seg_size);
let segno = start_pos.segment_number(wal_seg_size as u64);
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
@@ -469,7 +454,7 @@ fn write_wal_file(
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if XLogSegmentOffset(start_pos, wal_seg_size) == 0 {
if start_pos.segment_offset(wal_seg_size as u64) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;

View File

@@ -14,6 +14,7 @@
// TODO: Even though the postgres code runs in a separate process,
// it's not a secure sandbox.
//
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::assert;
use std::cell::RefCell;
@@ -31,8 +32,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use zenith_utils::lsn::Lsn;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
@@ -67,7 +67,7 @@ struct WalRedoManagerInternal {
#[derive(Debug)]
struct WalRedoRequest {
tag: BufferTag,
lsn: u64,
lsn: Lsn,
response_channel: oneshot::Sender<Result<Bytes, WalRedoError>>,
}
@@ -138,7 +138,7 @@ impl WalRedoManager {
/// Request the WAL redo manager to apply WAL records, to reconstruct the page image
/// of the given page version.
///
pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, WalRedoError> {
pub async fn request_redo(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes, WalRedoError> {
// Create a channel where to receive the response
let (tx, rx) = oneshot::channel::<Result<Bytes, WalRedoError>>();
@@ -225,18 +225,16 @@ impl WalRedoManagerInternal {
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
} else {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}",
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.lsn,
record.main_data_offset, record.rec.len());
return;
}
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}",
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.lsn,
record.main_data_offset, record.rec.len());
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
@@ -305,9 +303,8 @@ impl WalRedoManagerInternal {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}",
record.lsn >> 32,
record.lsn & 0xffffffff,
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {} main_data_offset {}, rec.len {}",
record.lsn,
record.main_data_offset, record.rec.len());
}
} else if xl_rmid == pg_constants::RM_XACT_ID {
@@ -325,11 +322,10 @@ impl WalRedoManagerInternal {
let result: Result<Bytes, WalRedoError>;
trace!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
"applied {} WAL records in {} ms to reconstruct page image at LSN {}",
nrecords,
duration.as_millis(),
lsn >> 32,
lsn & 0xffff_ffff
lsn
);
if let Err(e) = apply_result {
@@ -536,13 +532,13 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
buf.freeze()
}
fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes {
fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
let len = 4 + 8 + rec.len();
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'A');
buf.put_u32(len as u32);
buf.put_u64(endlsn);
buf.put_u64(endlsn.0);
buf.put(rec);
assert!(buf.len() == 1 + len);