Support collecting nonrel object in GC

This commit is contained in:
Konstantin Knizhnik
2021-06-09 22:16:35 +03:00
parent 41c352be7f
commit a3b70745a9
12 changed files with 196 additions and 238 deletions

View File

@@ -17,8 +17,7 @@ use std::time::SystemTime;
use tar::{Builder, Header};
use walkdir::WalkDir;
use crate::repository::{DatabaseTag, ObjectTag, SlruBufferTag, Timeline};
use postgres_ffi::nonrelfile_utils::*;
use crate::repository::{DatabaseTag, ObjectTag, Timeline};
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
@@ -188,21 +187,14 @@ impl<'a> Basebackup<'a> {
Ok(())
}
// Check transaction status
fn get_tx_status(&self, xid: TransactionId) -> anyhow::Result<u8> {
let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
let clog_page = self.timeline.get_page_at_lsn(tag, self.lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
Ok(status)
}
//
// Extract twophase state files
//
fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> {
// Include in tarball two-phase files only of in-progress transactions
if self.get_tx_status(xid)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
if self.timeline.get_tx_status(xid, self.lsn)?
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS
{
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
@@ -225,8 +217,8 @@ impl<'a> Basebackup<'a> {
let pg_control_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?;
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// Here starts pg_resetwal inspired magic
// Generate new pg_control and WAL needed for bootstrap
@@ -234,7 +226,7 @@ impl<'a> Basebackup<'a> {
let new_lsn = XLogSegNoOffsetToRecPtr(
new_segno,
SizeOfXLogLongPHD as u32,
XLOG_SIZE_OF_XLOG_LONG_PHD as u32,
pg_constants::WAL_SEGMENT_SIZE,
);
checkpoint.redo = new_lsn;
@@ -247,7 +239,7 @@ impl<'a> Basebackup<'a> {
pg_control.checkPointCopy = checkpoint;
//send pg_control
let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control);
let pg_control_bytes = pg_control.encode();
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..])?;

View File

@@ -21,8 +21,8 @@ use crate::{PageServerConf, ZTimelineId};
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryInto;
use std::sync::{Arc, Mutex, RwLock};
@@ -725,62 +725,160 @@ impl ObjectTimeline {
// WAL is large enough to perform GC
let now = Instant::now();
let mut truncated = 0u64;
let mut inspected = 0u64;
let mut deleted = 0u64;
// Iterate through all relations
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? {
let mut last_version = true;
let mut key = relation_size_key(self.timelineid, *rels);
let mut max_size = 0u32;
let mut relation_dropped = false;
// Process relation metadata versions
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
let rel_meta = RelationSizeEntry::des(&vers.1)?;
// If relation is dropped at the horizon,
// we can remove all its versions including last (Unlink)
match rel_meta {
RelationSizeEntry::Size(size) => max_size = max(max_size, size),
RelationSizeEntry::Unlink => {
// Iterate through all objects in timeline
for obj in self
.obj_store
.list_objects(self.timelineid, false, last_lsn)?
{
inspected += 1;
match obj {
// Prepared transactions
ObjectTag::TwoPhase(prepare) => {
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
if self.get_tx_status(prepare.xid, horizon)?
!= pg_constants::TRANSACTION_STATUS_IN_PROGRESS
{
let lsn = vers.0;
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
ObjectTag::RelationMetadata(_) => {
// Do not need to reconstruct page images,
// just delete all old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
relation_dropped = true;
info!("Relation {:?} dropped", rels);
let content = vers.1;
match ObjectValue::des(&content[..])? {
ObjectValue::Unlink => {
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
_ => (), // preserve last version
}
last_version = false;
truncated += 1;
} else {
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
if last_version {
last_version = false;
if !relation_dropped {
// preserve last version
continue;
ObjectTag::RelationBuffer(tag) => {
// Reconstruct last page
self.get_page_at_lsn_nowait(obj, last_lsn)?;
// Reconstruct page at horizon unless relation was dropped
// and delete all older versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
truncated += 1;
last_version = false;
if let Some(rel_size) = self.relsize_get_nowait(tag.rel, lsn)? {
if rel_size > tag.blknum {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
continue;
}
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
} else {
if let Some(rel_size) =
self.relsize_get_nowait(tag.rel, last_lsn)?
{
debug!("Preserve block {} of relation {:?} at {} because relation has size {} at {}", tag.rel, tag, lsn, rel_size, last_lsn);
continue;
}
debug!("Relation {:?} was dropped at {}", tag.rel, lsn);
}
// relation was dropped or truncated so this block can be removed
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
// Now process all relation blocks
for blknum in 0..max_size {
key.buf_tag.blknum = blknum;
last_version = true;
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
last_version = false;
truncated += 1;
if !relation_dropped {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(key.buf_tag, lsn)?;
continue;
// SLRU-s
ObjectTag::Clog(_)
| ObjectTag::MultiXactOffsets(_)
| ObjectTag::MultiXactMembers(_) => {
// Materialize last version
self.get_page_at_lsn_nowait(obj, last_lsn)?;
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
let content = vers.1;
match ObjectValue::des(&content[..])? {
ObjectValue::Unlink => {
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
ObjectValue::WALRecord(_) => {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
}
_ => {} // do nothing if already materialized
}
last_version = false;
truncated += 1;
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
// versioned alwaysmaterialized objects: no need to reconstruct pages
ObjectTag::Checkpoint | ObjectTag::ControlFile => {
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
// presrve last version
last_version = false;
truncated += 1;
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
_ => (), // do nothing
}
}
info!("Garbage collection completed in {:?}: {} version histories truncated, {} versions deleted",
now.elapsed(), truncated, deleted);
info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} version histories truncated, {} versions deleted",
now.elapsed(), inspected, truncated, deleted);
}
}
}

View File

@@ -229,7 +229,7 @@ impl PageServerHandler {
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
}
PagestreamFeMessage::Read(req) => {
let buf_tag = BufferTag {
let tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
@@ -237,9 +237,9 @@ impl PageServerHandler {
forknum: req.forknum,
},
blknum: req.blkno,
};
});
let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) {
let read_response = match timeline.get_page_at_lsn(tag, req.lsn) {
Ok(p) => PagestreamReadResponse {
ok: true,
n_blocks: 0,
@@ -291,13 +291,17 @@ impl PageServerHandler {
let snapshot_lsn =
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
let req_lsn = lsn.unwrap_or(snapshot_lsn);
basebackup::send_tarball_at_lsn(
&mut CopyDataSink { pgb },
timelineid,
&timeline,
req_lsn,
snapshot_lsn,
)?;
{
let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new(
&mut writer,
timelineid,
&timeline,
req_lsn,
snapshot_lsn,
);
basebackup.send_tarball()?;
}
pgb.write_message(&BeMessage::CopyDone)?;
debug!("CopyDone sent!");
@@ -507,156 +511,6 @@ impl postgres_backend::Handler for PageServerHandler {
pgb.flush()?;
Ok(())
}
fn handle_controlfile(&mut self) -> io::Result<()> {
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::ControlFile)?;
self.write_message(&BeMessage::CommandComplete)?;
Ok(())
}
fn handle_pagerequests(&mut self, timelineid: ZTimelineId) -> anyhow::Result<()> {
// Check that the timeline exists
let repository = page_cache::get_repository();
let timeline = repository.get_timeline(timelineid).map_err(|_| {
anyhow!(
"client requested pagestream on timeline {} which does not exist in page server",
timelineid
)
})?;
/* switch client to COPYBOTH */
self.stream.write_u8(b'W')?;
self.stream.write_i32::<BE>(4 + 1 + 2)?;
self.stream.write_u8(0)?; /* copy_is_binary */
self.stream.write_i16::<BE>(0)?; /* numAttributes */
self.stream.flush()?;
while let Some(message) = self.read_message()? {
trace!("query({:?}): {:?}", timelineid, message);
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
_ => continue,
};
let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let response = match zenith_fe_msg {
PagestreamFeMessage::Exists(req) => {
let tag = RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false);
PagestreamBeMessage::Status(PagestreamStatusResponse {
ok: exist,
n_blocks: 0,
})
}
PagestreamFeMessage::Nblocks(req) => {
let tag = RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0);
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
}
PagestreamFeMessage::Read(req) => {
let buf_tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
},
blknum: req.blkno,
});
let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) {
Ok(p) => PagestreamReadResponse {
ok: true,
n_blocks: 0,
page: p,
},
Err(e) => {
const ZERO_PAGE: [u8; 8192] = [0; 8192];
error!("get_page_at_lsn: {}", e);
PagestreamReadResponse {
ok: false,
n_blocks: 0,
page: Bytes::from_static(&ZERO_PAGE),
}
}
};
PagestreamBeMessage::Read(read_response)
}
};
self.write_message(&BeMessage::CopyData(response.serialize()))?;
}
Ok(())
}
fn handle_basebackup_request(
&mut self,
timelineid: ZTimelineId,
lsn: Option<Lsn>,
) -> anyhow::Result<()> {
// check that the timeline exists
let repository = page_cache::get_repository();
let timeline = repository.get_timeline(timelineid).map_err(|e| {
error!("error fetching timeline: {:?}", e);
anyhow!(
"client requested basebackup on timeline {} which does not exist in page server",
timelineid
)
})?;
/* switch client to COPYOUT */
let stream = &mut self.stream;
stream.write_u8(b'H')?;
stream.write_i32::<BE>(4 + 1 + 2)?;
stream.write_u8(0)?; /* copy_is_binary */
stream.write_i16::<BE>(0)?; /* numAttributes */
stream.flush()?;
info!("sent CopyOut");
/* Send a tarball of the latest snapshot on the timeline */
// find latest snapshot
let snapshot_lsn =
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
{
let mut writer = CopyDataSink { stream };
let mut basebackup = basebackup::Basebackup::new(
&mut writer,
timelineid,
&timeline,
req_lsn,
snapshot_lsn,
);
basebackup.send_tarball()?;
}
// CopyDone
self.stream.write_u8(b'c')?;
self.stream.write_u32::<BE>(4)?;
self.stream.flush()?;
debug!("CopyDone sent!");
Ok(())
}
}
///

View File

@@ -2,6 +2,8 @@ use crate::waldecoder::TransactionId;
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::forknumber_to_name;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
@@ -106,6 +108,15 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
// Check transaction status
fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result<u8> {
let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
Ok(status)
}
}
pub trait History: Iterator<Item = Result<RelationUpdate>> {

View File

@@ -288,11 +288,11 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let mut last_lsn = startpoint;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = decode_checkpoint(checkpoint_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
if checkpoint.nextXid.value == 0 {
let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?;
let pg_control = decode_pg_control(pg_control_bytes)?;
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
checkpoint = pg_control.checkPointCopy;
}
@@ -358,7 +358,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
offset = 0;
}
info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = encode_checkpoint(checkpoint);
let checkpoint_bytes = checkpoint.encode();
timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes)?;
Ok(())
}

View File

@@ -175,16 +175,16 @@ impl ObjectStore for RocksObjectStore {
let key = StorageKey::des(iter.key().unwrap())?;
if let ObjectTag::RelationBuffer(buf_tag) = key.obj_key.tag {
if (spcnode != 0 && buf_tag.rel.spcnode != spcnode)
|| (dbnode != 0 && buf_tag.rel.dbnode != dbnode)
{
|| (dbnode != 0 && buf_tag.rel.dbnode != dbnode)
{
break;
}
if key.lsn < lsn {
rels.insert(buf_tag.rel);
}
let mut next_tag = buf_tag.clone();
next_tag.rel.relnode += 1; // skip to next relation
search_key = ObjectTag::RelationBuffernext_tag);
let mut next_tag = buf_tag.clone();
next_tag.rel.relnode += 1; // skip to next relation
search_key.obj_key.tag = ObjectTag::RelationBuffer(next_tag);
} else {
break;
}

View File

@@ -7,6 +7,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::CheckPoint;
use postgres_ffi::XLogLongPageHeaderData;
use postgres_ffi::XLogPageHeaderData;
use postgres_ffi::XLogRecord;

View File

@@ -176,7 +176,7 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = decode_checkpoint(checkpoint_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
while let Some(replication_message) = physical_stream.next()? {
@@ -196,12 +196,12 @@ fn walreceiver_main(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let old_checkpoint_bytes = encode_checkpoint(checkpoint);
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(&mut checkpoint, recdata.clone());
restore_local_repo::save_decoded_record(&*timeline, &decoded, recdata, lsn)?;
last_rec_lsn = lsn;
let new_checkpoint_bytes = encode_checkpoint(checkpoint);
let new_checkpoint_bytes = checkpoint.encode();
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
ObjectTag::Checkpoint,

View File

@@ -42,7 +42,7 @@ use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
use crate::PageServerConf;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::XLogRecord;
use postgres_ffi::XLogRecord;
///
/// WAL Redo Manager is responsible for replaying WAL records.

View File

@@ -4,7 +4,7 @@
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
pub mod controlfile_utils;
pub mod pg_constants;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod xlog_utils;

View File

@@ -7,9 +7,9 @@
// have been named the same as the corresponding PostgreSQL functions instead.
//
use crate::encode_checkpoint;
use crate::pg_constants;
use crate::CheckPoint;
use crate::ControlFileData;
use crate::FullTransactionId;
use crate::XLogLongPageHeaderData;
use crate::XLogPageHeaderData;
@@ -37,6 +37,7 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
pub type XLogRecPtr = u64;
pub type TimeLineID = u32;
@@ -398,7 +399,7 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_tli: 1, // FIXME: always use Postgres timeline 1
xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64,
xlp_pageaddr: pg_control.checkPointCopy.redo - XLOG_SIZE_OF_XLOG_LONG_PHD as u64,
xlp_rem_len: 0,
}
},
@@ -407,7 +408,7 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes {
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
let hdr_bytes = encode_xlog_long_phd(hdr);
let hdr_bytes = hdr.encode();
seg_buf.extend_from_slice(&hdr_bytes);
let rec_hdr = XLogRecord {
@@ -425,8 +426,8 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes {
rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8);
let rec_bytes = encode_xlog_record(rec_hdr);
let checkpoint_bytes = encode_checkpoint(pg_control.checkPointCopy);
let rec_bytes = rec_hdr.encode();
let checkpoint_bytes = pg_control.checkPointCopy.encode();
//calculate record checksum
let mut crc = 0;

View File

@@ -1,2 +1,3 @@
#include "c.h"
#include "access/xlog_internal.h"
#include "access/xlogrecord.h"