refactor walreciever and restore_local_repo

This commit is contained in:
anastasia
2021-05-05 15:13:08 +03:00
committed by lubennikovaav
parent 29f122009a
commit 15db0d1d6f
7 changed files with 82 additions and 89 deletions

View File

@@ -1,6 +1,6 @@
pub mod rocksdb;
use crate::waldecoder::Oid;
use crate::waldecoder::{Oid, DecodedWALRecord};
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -47,6 +47,13 @@ pub trait Timeline {
src_tablespace_id: Oid,
) -> Result<()>;
fn save_decoded_record(
&self,
decoded: DecodedWALRecord,
recdata: Bytes,
lsn: Lsn
) -> anyhow::Result<()>;
fn advance_last_valid_lsn(&self, lsn: Lsn);
fn get_last_valid_lsn(&self) -> Lsn;
fn init_valid_lsn(&self, lsn: Lsn);

View File

@@ -7,7 +7,7 @@
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::Oid;
use crate::waldecoder::{Oid, DecodedWALRecord, XlSmgrTruncate, XlCreateDatabase};
use crate::walredo::WalRedoManager;
use crate::ZTimelineId;
use crate::{zenith_repo_dir, PageServerConf};
@@ -24,6 +24,8 @@ use std::thread;
use std::time::{Duration, Instant};
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
use postgres_ffi::pg_constants;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -794,6 +796,68 @@ impl Timeline for RocksTimeline {
Ok(())
}
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies.
fn save_decoded_record(
&self,
decoded: DecodedWALRecord,
recdata: Bytes,
lsn: Lsn) -> anyhow::Result<()>
{
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
self.put_wal_record(tag, rec);
}
// include truncate wal record in all pages
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&decoded);
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
let rel = RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: pg_constants::MAIN_FORKNUM,
};
self.put_truncation(rel, lsn, truncate.blkno)?;
}
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&decoded);
self.create_database(
lsn,
createdb.db_id,
createdb.tablespace_id,
createdb.src_db_id,
createdb.src_tablespace_id,
)?;
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
self.advance_last_record_lsn(lsn);
Ok(())
}
/// Remember that WAL has been received and added to the timeline up to the given LSN
fn advance_last_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);

View File

@@ -26,7 +26,7 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use bytes::Bytes;
use crate::repository::{BufferTag, RelTag, Timeline, WALRecord};
use crate::repository::{BufferTag, RelTag, Timeline};
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -353,32 +353,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
}
if let Some((lsn, recdata)) = rec.unwrap() {
let decoded = decode_wal_record(recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
// a Bytes, which uses a reference counter for the underlying buffer,
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, rec);
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
timeline.advance_last_valid_lsn(lsn);
timeline.save_decoded_record(decoded, recdata, lsn)?;
last_lsn = lsn;
} else {
break;

View File

@@ -309,8 +309,6 @@ pub struct DecodedWALRecord {
pub type Oid = u32;
pub type BlockNumber = u32;
pub const MAIN_FORKNUM: u8 = 0;
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
#[repr(C)]
#[derive(Debug, Clone, Copy)]

View File

@@ -7,14 +7,13 @@
//!
use crate::page_cache;
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::repository::{Repository, Timeline};
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::Error;
use lazy_static::lazy_static;
use log::*;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
@@ -219,61 +218,7 @@ fn walreceiver_main(
loop {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let decoded = decode_wal_record(recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
// a Bytes, which uses a reference counter for the underlying buffer,
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, rec);
}
// include truncate wal record in all pages
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&decoded);
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
let rel = RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: MAIN_FORKNUM,
};
timeline.put_truncation(rel, lsn, truncate.blkno)?;
}
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&decoded);
timeline.create_database(
lsn,
createdb.db_id,
createdb.tablespace_id,
createdb.src_db_id,
createdb.src_tablespace_id,
)?;
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);
timeline.save_decoded_record(decoded, recdata, lsn)?;
} else {
break;
}

View File

@@ -98,3 +98,7 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous
pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
pub const MAIN_FORKNUM: u8 = 0;
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;