mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
The 'latest' argument was passed to the functions in
pgdatadir_mapping.rs to know when they can update the relsize
cache. Commit e69ff3fc00 changed how the relsize cache is updated,
making the 'latest' argument unused.
2300 lines
91 KiB
Rust
2300 lines
91 KiB
Rust
//!
|
|
//! Parse PostgreSQL WAL records and store them in a neon Timeline.
|
|
//!
|
|
//! The pipeline for ingesting WAL looks like this:
|
|
//!
|
|
//! WAL receiver -> WalIngest -> Repository
|
|
//!
|
|
//! The WAL receiver receives a stream of WAL from the WAL safekeepers,
|
|
//! and decodes it to individual WAL records. It feeds the WAL records
|
|
//! to WalIngest, which parses them and stores them in the Repository.
|
|
//!
|
|
//! The neon Repository can store page versions in two formats: as
|
|
//! page images, or a WAL records. WalIngest::ingest_record() extracts
|
|
//! page images out of some WAL records, but most it stores as WAL
|
|
//! records. If a WAL record modifies multiple pages, WalIngest
|
|
//! will call Repository::put_wal_record or put_page_image functions
|
|
//! separately for each modified page.
|
|
//!
|
|
//! To reconstruct a page using a WAL record, the Repository calls the
|
|
//! code in walredo.rs. walredo.rs passes most WAL records to the WAL
|
|
//! redo Postgres process, but some records it can handle directly with
|
|
//! bespoken Rust code.
|
|
|
|
use pageserver_api::shard::ShardIdentity;
|
|
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
|
|
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
|
|
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
|
|
|
use anyhow::{bail, Context, Result};
|
|
use bytes::{Buf, Bytes, BytesMut};
|
|
use tracing::*;
|
|
use utils::failpoint_support;
|
|
|
|
use crate::context::RequestContext;
|
|
use crate::metrics::WAL_INGEST;
|
|
use crate::pgdatadir_mapping::{DatadirModification, Version};
|
|
use crate::tenant::PageReconstructError;
|
|
use crate::tenant::Timeline;
|
|
use crate::walrecord::*;
|
|
use crate::ZERO_PAGE;
|
|
use pageserver_api::key::rel_block_to_key;
|
|
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
|
use postgres_ffi::pg_constants;
|
|
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
|
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
|
use postgres_ffi::v14::xlog_utils::*;
|
|
use postgres_ffi::v14::CheckPoint;
|
|
use postgres_ffi::TransactionId;
|
|
use postgres_ffi::BLCKSZ;
|
|
use utils::lsn::Lsn;
|
|
|
|
pub struct WalIngest {
|
|
shard: ShardIdentity,
|
|
checkpoint: CheckPoint,
|
|
checkpoint_modified: bool,
|
|
}
|
|
|
|
impl WalIngest {
|
|
pub async fn new(
|
|
timeline: &Timeline,
|
|
startpoint: Lsn,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<WalIngest> {
|
|
// Fetch the latest checkpoint into memory, so that we can compare with it
|
|
// quickly in `ingest_record` and update it when it changes.
|
|
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
|
|
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
|
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
|
|
|
Ok(WalIngest {
|
|
shard: *timeline.get_shard_identity(),
|
|
checkpoint,
|
|
checkpoint_modified: false,
|
|
})
|
|
}
|
|
|
|
///
|
|
/// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
|
|
///
|
|
/// This function updates `lsn` field of `DatadirModification`
|
|
///
|
|
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
|
|
/// relations/pages that the record affects.
|
|
///
|
|
/// This function returns `true` if the record was ingested, and `false` if it was filtered out
|
|
///
|
|
pub async fn ingest_record(
|
|
&mut self,
|
|
recdata: Bytes,
|
|
lsn: Lsn,
|
|
modification: &mut DatadirModification<'_>,
|
|
decoded: &mut DecodedWALRecord,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<bool> {
|
|
WAL_INGEST.records_received.inc();
|
|
let pg_version = modification.tline.pg_version;
|
|
let prev_len = modification.len();
|
|
|
|
modification.set_lsn(lsn)?;
|
|
decode_wal_record(recdata, decoded, pg_version)?;
|
|
|
|
let mut buf = decoded.record.clone();
|
|
buf.advance(decoded.main_data_offset);
|
|
|
|
assert!(!self.checkpoint_modified);
|
|
if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
|
|
&& self.checkpoint.update_next_xid(decoded.xl_xid)
|
|
{
|
|
self.checkpoint_modified = true;
|
|
}
|
|
|
|
failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
|
|
|
|
match decoded.xl_rmid {
|
|
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
|
// Heap AM records need some special handling, because they modify VM pages
|
|
// without registering them with the standard mechanism.
|
|
self.ingest_heapam_record(&mut buf, modification, decoded, ctx)
|
|
.await?;
|
|
}
|
|
pg_constants::RM_NEON_ID => {
|
|
self.ingest_neonrmgr_record(&mut buf, modification, decoded, ctx)
|
|
.await?;
|
|
}
|
|
// Handle other special record types
|
|
pg_constants::RM_SMGR_ID => {
|
|
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
|
|
if info == pg_constants::XLOG_SMGR_CREATE {
|
|
let create = XlSmgrCreate::decode(&mut buf);
|
|
self.ingest_xlog_smgr_create(modification, &create, ctx)
|
|
.await?;
|
|
} else if info == pg_constants::XLOG_SMGR_TRUNCATE {
|
|
let truncate = XlSmgrTruncate::decode(&mut buf);
|
|
self.ingest_xlog_smgr_truncate(modification, &truncate, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
pg_constants::RM_DBASE_ID => {
|
|
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
debug!(%info, %pg_version, "handle RM_DBASE_ID");
|
|
|
|
if pg_version == 14 {
|
|
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
|
|
let createdb = XlCreateDatabase::decode(&mut buf);
|
|
debug!("XLOG_DBASE_CREATE v14");
|
|
|
|
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
|
|
.await?;
|
|
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
|
|
let dropdb = XlDropDatabase::decode(&mut buf);
|
|
for tablespace_id in dropdb.tablespace_ids {
|
|
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
|
|
modification
|
|
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
} else if pg_version == 15 {
|
|
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
|
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
|
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
|
// The XLOG record was renamed between v14 and v15,
|
|
// but the record format is the same.
|
|
// So we can reuse XlCreateDatabase here.
|
|
debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
|
let createdb = XlCreateDatabase::decode(&mut buf);
|
|
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
|
|
.await?;
|
|
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
|
|
let dropdb = XlDropDatabase::decode(&mut buf);
|
|
for tablespace_id in dropdb.tablespace_ids {
|
|
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
|
|
modification
|
|
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
} else if pg_version == 16 {
|
|
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
|
|
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
|
|
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
|
|
// The XLOG record was renamed between v14 and v15,
|
|
// but the record format is the same.
|
|
// So we can reuse XlCreateDatabase here.
|
|
debug!("XLOG_DBASE_CREATE_FILE_COPY");
|
|
let createdb = XlCreateDatabase::decode(&mut buf);
|
|
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
|
|
.await?;
|
|
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
|
|
let dropdb = XlDropDatabase::decode(&mut buf);
|
|
for tablespace_id in dropdb.tablespace_ids {
|
|
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
|
|
modification
|
|
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
pg_constants::RM_TBLSPC_ID => {
|
|
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
|
|
}
|
|
pg_constants::RM_CLOG_ID => {
|
|
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
|
|
|
|
if info == pg_constants::CLOG_ZEROPAGE {
|
|
let pageno = buf.get_u32_le();
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
self.put_slru_page_image(
|
|
modification,
|
|
SlruKind::Clog,
|
|
segno,
|
|
rpageno,
|
|
ZERO_PAGE.clone(),
|
|
ctx,
|
|
)
|
|
.await?;
|
|
} else {
|
|
assert!(info == pg_constants::CLOG_TRUNCATE);
|
|
let xlrec = XlClogTruncate::decode(&mut buf);
|
|
self.ingest_clog_truncate_record(modification, &xlrec, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
pg_constants::RM_XACT_ID => {
|
|
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
|
|
|
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT {
|
|
let parsed_xact =
|
|
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
|
self.ingest_xact_record(
|
|
modification,
|
|
&parsed_xact,
|
|
info == pg_constants::XLOG_XACT_COMMIT,
|
|
ctx,
|
|
)
|
|
.await?;
|
|
} else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED
|
|
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
|
|
{
|
|
let parsed_xact =
|
|
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
|
self.ingest_xact_record(
|
|
modification,
|
|
&parsed_xact,
|
|
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
|
|
ctx,
|
|
)
|
|
.await?;
|
|
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
|
|
trace!(
|
|
"Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
|
|
decoded.xl_xid,
|
|
parsed_xact.xid,
|
|
lsn,
|
|
);
|
|
modification
|
|
.drop_twophase_file(parsed_xact.xid, ctx)
|
|
.await?;
|
|
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
|
modification
|
|
.put_twophase_file(decoded.xl_xid, Bytes::copy_from_slice(&buf[..]), ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
pg_constants::RM_MULTIXACT_ID => {
|
|
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
|
|
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
|
|
let pageno = buf.get_u32_le();
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
self.put_slru_page_image(
|
|
modification,
|
|
SlruKind::MultiXactOffsets,
|
|
segno,
|
|
rpageno,
|
|
ZERO_PAGE.clone(),
|
|
ctx,
|
|
)
|
|
.await?;
|
|
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
|
|
let pageno = buf.get_u32_le();
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
self.put_slru_page_image(
|
|
modification,
|
|
SlruKind::MultiXactMembers,
|
|
segno,
|
|
rpageno,
|
|
ZERO_PAGE.clone(),
|
|
ctx,
|
|
)
|
|
.await?;
|
|
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
|
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
|
self.ingest_multixact_create_record(modification, &xlrec)?;
|
|
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
|
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
|
self.ingest_multixact_truncate_record(modification, &xlrec, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
pg_constants::RM_RELMAP_ID => {
|
|
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
|
self.ingest_relmap_page(modification, &xlrec, decoded, ctx)
|
|
.await?;
|
|
}
|
|
pg_constants::RM_XLOG_ID => {
|
|
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
|
|
if info == pg_constants::XLOG_NEXTOID {
|
|
let next_oid = buf.get_u32_le();
|
|
if self.checkpoint.nextOid != next_oid {
|
|
self.checkpoint.nextOid = next_oid;
|
|
self.checkpoint_modified = true;
|
|
}
|
|
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
|
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
|
{
|
|
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
|
|
buf.copy_to_slice(&mut checkpoint_bytes);
|
|
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
|
trace!(
|
|
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
|
|
xlog_checkpoint.oldestXid,
|
|
self.checkpoint.oldestXid
|
|
);
|
|
if (self
|
|
.checkpoint
|
|
.oldestXid
|
|
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
|
|
< 0
|
|
{
|
|
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
|
}
|
|
trace!(
|
|
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
|
xlog_checkpoint.oldestActiveXid,
|
|
self.checkpoint.oldestActiveXid
|
|
);
|
|
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
|
|
|
// Write a new checkpoint key-value pair on every checkpoint record, even
|
|
// if nothing really changed. Not strictly required, but it seems nice to
|
|
// have some trace of the checkpoint records in the layer files at the same
|
|
// LSNs.
|
|
self.checkpoint_modified = true;
|
|
}
|
|
}
|
|
pg_constants::RM_LOGICALMSG_ID => {
|
|
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
|
|
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
|
|
let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
|
|
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
|
|
let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
|
|
if prefix == "neon-test" {
|
|
// This is a convenient way to make the WAL ingestion pause at
|
|
// particular point in the WAL. For more fine-grained control,
|
|
// we could peek into the message and only pause if it contains
|
|
// a particular string, for example, but this is enough for now.
|
|
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
|
|
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
|
|
modification.put_file(path, message, ctx).await?;
|
|
}
|
|
}
|
|
}
|
|
pg_constants::RM_STANDBY_ID => {
|
|
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
|
if info == pg_constants::XLOG_RUNNING_XACTS {
|
|
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
|
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
|
}
|
|
}
|
|
_x => {
|
|
// TODO: should probably log & fail here instead of blindly
|
|
// doing something without understanding the protocol
|
|
}
|
|
}
|
|
|
|
// Iterate through all the blocks that the record modifies, and
|
|
// "put" a separate copy of the record for each block.
|
|
for blk in decoded.blocks.iter() {
|
|
let rel = RelTag {
|
|
spcnode: blk.rnode_spcnode,
|
|
dbnode: blk.rnode_dbnode,
|
|
relnode: blk.rnode_relnode,
|
|
forknum: blk.forknum,
|
|
};
|
|
|
|
let key = rel_block_to_key(rel, blk.blkno);
|
|
let key_is_local = self.shard.is_key_local(&key);
|
|
|
|
tracing::debug!(
|
|
lsn=%lsn,
|
|
key=%key,
|
|
"ingest: shard decision {} (checkpoint={})",
|
|
if !key_is_local { "drop" } else { "keep" },
|
|
self.checkpoint_modified
|
|
);
|
|
|
|
if !key_is_local {
|
|
if self.shard.is_shard_zero() {
|
|
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
|
|
// its blkno in case it implicitly extends a relation.
|
|
self.observe_decoded_block(modification, blk, ctx).await?;
|
|
}
|
|
|
|
continue;
|
|
}
|
|
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
|
|
.await?;
|
|
}
|
|
|
|
// If checkpoint data was updated, store the new version in the repository
|
|
if self.checkpoint_modified {
|
|
let new_checkpoint_bytes = self.checkpoint.encode()?;
|
|
|
|
modification.put_checkpoint(new_checkpoint_bytes)?;
|
|
self.checkpoint_modified = false;
|
|
}
|
|
|
|
// Note that at this point this record is only cached in the modification
|
|
// until commit() is called to flush the data into the repository and update
|
|
// the latest LSN.
|
|
|
|
Ok(modification.len() > prev_len)
|
|
}
|
|
|
|
/// Do not store this block, but observe it for the purposes of updating our relation size state.
|
|
async fn observe_decoded_block(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
blk: &DecodedBkpBlock,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), PageReconstructError> {
|
|
let rel = RelTag {
|
|
spcnode: blk.rnode_spcnode,
|
|
dbnode: blk.rnode_dbnode,
|
|
relnode: blk.rnode_relnode,
|
|
forknum: blk.forknum,
|
|
};
|
|
self.handle_rel_extend(modification, rel, blk.blkno, ctx)
|
|
.await
|
|
}
|
|
|
|
async fn ingest_decoded_block(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
lsn: Lsn,
|
|
decoded: &DecodedWALRecord,
|
|
blk: &DecodedBkpBlock,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), PageReconstructError> {
|
|
let rel = RelTag {
|
|
spcnode: blk.rnode_spcnode,
|
|
dbnode: blk.rnode_dbnode,
|
|
relnode: blk.rnode_relnode,
|
|
forknum: blk.forknum,
|
|
};
|
|
|
|
//
|
|
// Instead of storing full-page-image WAL record,
|
|
// it is better to store extracted image: we can skip wal-redo
|
|
// in this case. Also some FPI records may contain multiple (up to 32) pages,
|
|
// so them have to be copied multiple times.
|
|
//
|
|
if blk.apply_image
|
|
&& blk.has_image
|
|
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
|
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
|
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
|
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
|
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)?
|
|
// do not materialize null pages because them most likely be soon replaced with real data
|
|
&& blk.bimg_len != 0
|
|
{
|
|
// Extract page image from FPI record
|
|
let img_len = blk.bimg_len as usize;
|
|
let img_offs = blk.bimg_offset as usize;
|
|
let mut image = BytesMut::with_capacity(BLCKSZ as usize);
|
|
image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]);
|
|
|
|
if blk.hole_length != 0 {
|
|
let tail = image.split_off(blk.hole_offset as usize);
|
|
image.resize(image.len() + blk.hole_length as usize, 0u8);
|
|
image.unsplit(tail);
|
|
}
|
|
//
|
|
// Match the logic of XLogReadBufferForRedoExtended:
|
|
// The page may be uninitialized. If so, we can't set the LSN because
|
|
// that would corrupt the page.
|
|
//
|
|
if !page_is_new(&image) {
|
|
page_set_lsn(&mut image, lsn)
|
|
}
|
|
assert_eq!(image.len(), BLCKSZ as usize);
|
|
self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx)
|
|
.await?;
|
|
} else {
|
|
let rec = NeonWalRecord::Postgres {
|
|
will_init: blk.will_init || blk.apply_image,
|
|
rec: decoded.record.clone(),
|
|
};
|
|
self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx)
|
|
.await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_heapam_record(
|
|
&mut self,
|
|
buf: &mut Bytes,
|
|
modification: &mut DatadirModification<'_>,
|
|
decoded: &DecodedWALRecord,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
// Handle VM bit updates that are implicitly part of heap records.
|
|
|
|
// First, look at the record to determine which VM bits need
|
|
// to be cleared. If either of these variables is set, we
|
|
// need to clear the corresponding bits in the visibility map.
|
|
let mut new_heap_blkno: Option<u32> = None;
|
|
let mut old_heap_blkno: Option<u32> = None;
|
|
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
|
|
|
match modification.tline.pg_version {
|
|
14 => {
|
|
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
|
|
|
if info == pg_constants::XLOG_HEAP_INSERT {
|
|
let xlrec = v14::XlHeapInsert::decode(buf);
|
|
assert_eq!(0, buf.remaining());
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
|
let xlrec = v14::XlHeapDelete::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
|
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
|
{
|
|
let xlrec = v14::XlHeapUpdate::decode(buf);
|
|
// the size of tuple data is inferred from the size of the record.
|
|
// we can't validate the remaining number of bytes without parsing
|
|
// the tuple data.
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
|
}
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
|
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
|
// non-HOT update where the new tuple goes to different page than
|
|
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
|
// set.
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
|
let xlrec = v14::XlHeapLock::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
|
}
|
|
}
|
|
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
|
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
|
let xlrec = v14::XlHeapMultiInsert::decode(buf);
|
|
|
|
let offset_array_len =
|
|
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
|
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
|
0
|
|
} else {
|
|
std::mem::size_of::<u16>() * xlrec.ntuples as usize
|
|
};
|
|
assert_eq!(offset_array_len, buf.remaining());
|
|
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
|
let xlrec = v14::XlHeapLockUpdated::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
|
}
|
|
}
|
|
} else {
|
|
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
|
}
|
|
}
|
|
15 => {
|
|
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
|
|
|
if info == pg_constants::XLOG_HEAP_INSERT {
|
|
let xlrec = v15::XlHeapInsert::decode(buf);
|
|
assert_eq!(0, buf.remaining());
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
|
let xlrec = v15::XlHeapDelete::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
|
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
|
{
|
|
let xlrec = v15::XlHeapUpdate::decode(buf);
|
|
// the size of tuple data is inferred from the size of the record.
|
|
// we can't validate the remaining number of bytes without parsing
|
|
// the tuple data.
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
|
}
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
|
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
|
// non-HOT update where the new tuple goes to different page than
|
|
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
|
// set.
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
|
let xlrec = v15::XlHeapLock::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
|
}
|
|
}
|
|
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
|
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
|
let xlrec = v15::XlHeapMultiInsert::decode(buf);
|
|
|
|
let offset_array_len =
|
|
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
|
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
|
0
|
|
} else {
|
|
std::mem::size_of::<u16>() * xlrec.ntuples as usize
|
|
};
|
|
assert_eq!(offset_array_len, buf.remaining());
|
|
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
|
let xlrec = v15::XlHeapLockUpdated::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
|
}
|
|
}
|
|
} else {
|
|
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
|
}
|
|
}
|
|
16 => {
|
|
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
|
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
|
|
|
if info == pg_constants::XLOG_HEAP_INSERT {
|
|
let xlrec = v16::XlHeapInsert::decode(buf);
|
|
assert_eq!(0, buf.remaining());
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
|
let xlrec = v16::XlHeapDelete::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
|
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
|
{
|
|
let xlrec = v16::XlHeapUpdate::decode(buf);
|
|
// the size of tuple data is inferred from the size of the record.
|
|
// we can't validate the remaining number of bytes without parsing
|
|
// the tuple data.
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
|
}
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
|
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
|
// non-HOT update where the new tuple goes to different page than
|
|
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
|
// set.
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP_LOCK {
|
|
let xlrec = v16::XlHeapLock::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
|
}
|
|
}
|
|
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
|
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
|
let xlrec = v16::XlHeapMultiInsert::decode(buf);
|
|
|
|
let offset_array_len =
|
|
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
|
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
|
0
|
|
} else {
|
|
std::mem::size_of::<u16>() * xlrec.ntuples as usize
|
|
};
|
|
assert_eq!(offset_array_len, buf.remaining());
|
|
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
|
|
let xlrec = v16::XlHeapLockUpdated::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
|
}
|
|
}
|
|
} else {
|
|
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
|
|
// Clear the VM bits if required.
|
|
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
|
let vm_rel = RelTag {
|
|
forknum: VISIBILITYMAP_FORKNUM,
|
|
spcnode: decoded.blocks[0].rnode_spcnode,
|
|
dbnode: decoded.blocks[0].rnode_dbnode,
|
|
relnode: decoded.blocks[0].rnode_relnode,
|
|
};
|
|
|
|
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
|
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
|
|
|
// Sometimes, Postgres seems to create heap WAL records with the
|
|
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
|
// not set. In fact, it's possible that the VM page does not exist at all.
|
|
// In that case, we don't want to store a record to clear the VM bit;
|
|
// replaying it would fail to find the previous image of the page, because
|
|
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
|
// record if it doesn't.
|
|
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
|
if let Some(blknum) = new_vm_blk {
|
|
if blknum >= vm_size {
|
|
new_vm_blk = None;
|
|
}
|
|
}
|
|
if let Some(blknum) = old_vm_blk {
|
|
if blknum >= vm_size {
|
|
old_vm_blk = None;
|
|
}
|
|
}
|
|
|
|
if new_vm_blk.is_some() || old_vm_blk.is_some() {
|
|
if new_vm_blk == old_vm_blk {
|
|
// An UPDATE record that needs to clear the bits for both old and the
|
|
// new page, both of which reside on the same VM page.
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
new_vm_blk.unwrap(),
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno,
|
|
old_heap_blkno,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
} else {
|
|
// Clear VM bits for one heap page, or for two pages that reside on
|
|
// different VM pages.
|
|
if let Some(new_vm_blk) = new_vm_blk {
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
new_vm_blk,
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno,
|
|
old_heap_blkno: None,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
}
|
|
if let Some(old_vm_blk) = old_vm_blk {
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
old_vm_blk,
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno: None,
|
|
old_heap_blkno,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_neonrmgr_record(
|
|
&mut self,
|
|
buf: &mut Bytes,
|
|
modification: &mut DatadirModification<'_>,
|
|
decoded: &DecodedWALRecord,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
// Handle VM bit updates that are implicitly part of heap records.
|
|
|
|
// First, look at the record to determine which VM bits need
|
|
// to be cleared. If either of these variables is set, we
|
|
// need to clear the corresponding bits in the visibility map.
|
|
let mut new_heap_blkno: Option<u32> = None;
|
|
let mut old_heap_blkno: Option<u32> = None;
|
|
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
|
|
let pg_version = modification.tline.pg_version;
|
|
|
|
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
|
|
|
|
match pg_version {
|
|
16 => {
|
|
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
|
|
|
match info {
|
|
pg_constants::XLOG_NEON_HEAP_INSERT => {
|
|
let xlrec = v16::rm_neon::XlNeonHeapInsert::decode(buf);
|
|
assert_eq!(0, buf.remaining());
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
}
|
|
pg_constants::XLOG_NEON_HEAP_DELETE => {
|
|
let xlrec = v16::rm_neon::XlNeonHeapDelete::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
}
|
|
pg_constants::XLOG_NEON_HEAP_UPDATE
|
|
| pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => {
|
|
let xlrec = v16::rm_neon::XlNeonHeapUpdate::decode(buf);
|
|
// the size of tuple data is inferred from the size of the record.
|
|
// we can't validate the remaining number of bytes without parsing
|
|
// the tuple data.
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
|
}
|
|
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
|
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
|
// non-HOT update where the new tuple goes to different page than
|
|
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
|
// set.
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
}
|
|
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
|
|
let xlrec = v16::rm_neon::XlNeonHeapMultiInsert::decode(buf);
|
|
|
|
let offset_array_len =
|
|
if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 {
|
|
// the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set
|
|
0
|
|
} else {
|
|
std::mem::size_of::<u16>() * xlrec.ntuples as usize
|
|
};
|
|
assert_eq!(offset_array_len, buf.remaining());
|
|
|
|
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
|
|
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
}
|
|
}
|
|
pg_constants::XLOG_NEON_HEAP_LOCK => {
|
|
let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
|
|
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
|
|
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
|
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
|
|
}
|
|
}
|
|
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
|
|
}
|
|
}
|
|
_ => bail!(
|
|
"Neon RMGR has no known compatibility with PostgreSQL version {}",
|
|
pg_version
|
|
),
|
|
}
|
|
|
|
// Clear the VM bits if required.
|
|
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
|
|
let vm_rel = RelTag {
|
|
forknum: VISIBILITYMAP_FORKNUM,
|
|
spcnode: decoded.blocks[0].rnode_spcnode,
|
|
dbnode: decoded.blocks[0].rnode_dbnode,
|
|
relnode: decoded.blocks[0].rnode_relnode,
|
|
};
|
|
|
|
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
|
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
|
|
|
// Sometimes, Postgres seems to create heap WAL records with the
|
|
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
|
// not set. In fact, it's possible that the VM page does not exist at all.
|
|
// In that case, we don't want to store a record to clear the VM bit;
|
|
// replaying it would fail to find the previous image of the page, because
|
|
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
|
// record if it doesn't.
|
|
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
|
if let Some(blknum) = new_vm_blk {
|
|
if blknum >= vm_size {
|
|
new_vm_blk = None;
|
|
}
|
|
}
|
|
if let Some(blknum) = old_vm_blk {
|
|
if blknum >= vm_size {
|
|
old_vm_blk = None;
|
|
}
|
|
}
|
|
|
|
if new_vm_blk.is_some() || old_vm_blk.is_some() {
|
|
if new_vm_blk == old_vm_blk {
|
|
// An UPDATE record that needs to clear the bits for both old and the
|
|
// new page, both of which reside on the same VM page.
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
new_vm_blk.unwrap(),
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno,
|
|
old_heap_blkno,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
} else {
|
|
// Clear VM bits for one heap page, or for two pages that reside on
|
|
// different VM pages.
|
|
if let Some(new_vm_blk) = new_vm_blk {
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
new_vm_blk,
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno,
|
|
old_heap_blkno: None,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
}
|
|
if let Some(old_vm_blk) = old_vm_blk {
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
old_vm_blk,
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno: None,
|
|
old_heap_blkno,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
|
|
async fn ingest_xlog_dbase_create(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rec: &XlCreateDatabase,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
let db_id = rec.db_id;
|
|
let tablespace_id = rec.tablespace_id;
|
|
let src_db_id = rec.src_db_id;
|
|
let src_tablespace_id = rec.src_tablespace_id;
|
|
|
|
let rels = modification
|
|
.tline
|
|
.list_rels(
|
|
src_tablespace_id,
|
|
src_db_id,
|
|
Version::Modified(modification),
|
|
ctx,
|
|
)
|
|
.await?;
|
|
|
|
debug!("ingest_xlog_dbase_create: {} rels", rels.len());
|
|
|
|
// Copy relfilemap
|
|
let filemap = modification
|
|
.tline
|
|
.get_relmap_file(
|
|
src_tablespace_id,
|
|
src_db_id,
|
|
Version::Modified(modification),
|
|
ctx,
|
|
)
|
|
.await?;
|
|
modification
|
|
.put_relmap_file(tablespace_id, db_id, filemap, ctx)
|
|
.await?;
|
|
|
|
let mut num_rels_copied = 0;
|
|
let mut num_blocks_copied = 0;
|
|
for src_rel in rels {
|
|
assert_eq!(src_rel.spcnode, src_tablespace_id);
|
|
assert_eq!(src_rel.dbnode, src_db_id);
|
|
|
|
let nblocks = modification
|
|
.tline
|
|
.get_rel_size(src_rel, Version::Modified(modification), ctx)
|
|
.await?;
|
|
let dst_rel = RelTag {
|
|
spcnode: tablespace_id,
|
|
dbnode: db_id,
|
|
relnode: src_rel.relnode,
|
|
forknum: src_rel.forknum,
|
|
};
|
|
|
|
modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
|
|
|
|
// Copy content
|
|
debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
|
|
for blknum in 0..nblocks {
|
|
// Sharding:
|
|
// - src and dst are always on the same shard, because they differ only by dbNode, and
|
|
// dbNode is not included in the hash inputs for sharding.
|
|
// - This WAL command is replayed on all shards, but each shard only copies the blocks
|
|
// that belong to it.
|
|
let src_key = rel_block_to_key(src_rel, blknum);
|
|
if !self.shard.is_key_local(&src_key) {
|
|
debug!(
|
|
"Skipping non-local key {} during XLOG_DBASE_CREATE",
|
|
src_key
|
|
);
|
|
continue;
|
|
}
|
|
debug!(
|
|
"copying block {} from {} ({}) to {}",
|
|
blknum, src_rel, src_key, dst_rel
|
|
);
|
|
|
|
let content = modification
|
|
.tline
|
|
.get_rel_page_at_lsn(src_rel, blknum, Version::Modified(modification), ctx)
|
|
.await?;
|
|
modification.put_rel_page_image(dst_rel, blknum, content)?;
|
|
num_blocks_copied += 1;
|
|
}
|
|
|
|
num_rels_copied += 1;
|
|
}
|
|
|
|
info!(
|
|
"Created database {}/{}, copied {} blocks in {} rels",
|
|
tablespace_id, db_id, num_blocks_copied, num_rels_copied
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_xlog_smgr_create(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rec: &XlSmgrCreate,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
let rel = RelTag {
|
|
spcnode: rec.rnode.spcnode,
|
|
dbnode: rec.rnode.dbnode,
|
|
relnode: rec.rnode.relnode,
|
|
forknum: rec.forknum,
|
|
};
|
|
self.put_rel_creation(modification, rel, ctx).await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
|
|
///
|
|
/// This is the same logic as in PostgreSQL's smgr_redo() function.
|
|
async fn ingest_xlog_smgr_truncate(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rec: &XlSmgrTruncate,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
let spcnode = rec.rnode.spcnode;
|
|
let dbnode = rec.rnode.dbnode;
|
|
let relnode = rec.rnode.relnode;
|
|
|
|
if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
|
|
let rel = RelTag {
|
|
spcnode,
|
|
dbnode,
|
|
relnode,
|
|
forknum: MAIN_FORKNUM,
|
|
};
|
|
self.put_rel_truncation(modification, rel, rec.blkno, ctx)
|
|
.await?;
|
|
}
|
|
if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
|
|
let rel = RelTag {
|
|
spcnode,
|
|
dbnode,
|
|
relnode,
|
|
forknum: FSM_FORKNUM,
|
|
};
|
|
|
|
let fsm_logical_page_no = rec.blkno / pg_constants::SLOTS_PER_FSM_PAGE;
|
|
let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
|
|
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
|
|
// Tail of last remaining FSM page has to be zeroed.
|
|
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
|
|
modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?;
|
|
fsm_physical_page_no += 1;
|
|
}
|
|
let nblocks = get_relsize(modification, rel, ctx).await?;
|
|
if nblocks > fsm_physical_page_no {
|
|
// check if something to do: FSM is larger than truncate position
|
|
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
|
|
let rel = RelTag {
|
|
spcnode,
|
|
dbnode,
|
|
relnode,
|
|
forknum: VISIBILITYMAP_FORKNUM,
|
|
};
|
|
|
|
let mut vm_page_no = rec.blkno / pg_constants::VM_HEAPBLOCKS_PER_PAGE;
|
|
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
|
|
// Tail of last remaining vm page has to be zeroed.
|
|
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
|
|
modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?;
|
|
vm_page_no += 1;
|
|
}
|
|
let nblocks = get_relsize(modification, rel, ctx).await?;
|
|
if nblocks > vm_page_no {
|
|
// check if something to do: VM is larger than truncate position
|
|
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
|
|
///
|
|
async fn ingest_xact_record(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
parsed: &XlXactParsedRecord,
|
|
is_commit: bool,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
// Record update of CLOG pages
|
|
let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
|
|
|
|
for subxact in &parsed.subxacts {
|
|
let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
if subxact_pageno != pageno {
|
|
// This subxact goes to different page. Write the record
|
|
// for all the XIDs on the previous page, and continue
|
|
// accumulating XIDs on this new page.
|
|
modification.put_slru_wal_record(
|
|
SlruKind::Clog,
|
|
segno,
|
|
rpageno,
|
|
if is_commit {
|
|
NeonWalRecord::ClogSetCommitted {
|
|
xids: page_xids,
|
|
timestamp: parsed.xact_time,
|
|
}
|
|
} else {
|
|
NeonWalRecord::ClogSetAborted { xids: page_xids }
|
|
},
|
|
)?;
|
|
page_xids = Vec::new();
|
|
}
|
|
pageno = subxact_pageno;
|
|
segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
page_xids.push(*subxact);
|
|
}
|
|
modification.put_slru_wal_record(
|
|
SlruKind::Clog,
|
|
segno,
|
|
rpageno,
|
|
if is_commit {
|
|
NeonWalRecord::ClogSetCommitted {
|
|
xids: page_xids,
|
|
timestamp: parsed.xact_time,
|
|
}
|
|
} else {
|
|
NeonWalRecord::ClogSetAborted { xids: page_xids }
|
|
},
|
|
)?;
|
|
|
|
for xnode in &parsed.xnodes {
|
|
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
|
|
let rel = RelTag {
|
|
forknum,
|
|
spcnode: xnode.spcnode,
|
|
dbnode: xnode.dbnode,
|
|
relnode: xnode.relnode,
|
|
};
|
|
if modification
|
|
.tline
|
|
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
self.put_rel_drop(modification, rel, ctx).await?;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_clog_truncate_record(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
xlrec: &XlClogTruncate,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
info!(
|
|
"RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
|
|
xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db
|
|
);
|
|
|
|
// Here we treat oldestXid and oldestXidDB
|
|
// differently from postgres redo routines.
|
|
// In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid
|
|
// until checkpoint happens and updates the value.
|
|
// Here we can use the most recent value.
|
|
// It's just an optimization, though and can be deleted.
|
|
// TODO Figure out if there will be any issues with replica.
|
|
self.checkpoint.oldestXid = xlrec.oldest_xid;
|
|
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
|
self.checkpoint_modified = true;
|
|
|
|
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
|
|
|
|
let latest_page_number =
|
|
self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
|
|
// Now delete all segments containing pages between xlrec.pageno
|
|
// and latest_page_number.
|
|
|
|
// First, make an important safety check:
|
|
// the current endpoint page must not be eligible for removal.
|
|
// See SimpleLruTruncate() in slru.c
|
|
if clogpage_precedes(latest_page_number, xlrec.pageno) {
|
|
info!("could not truncate directory pg_xact apparent wraparound");
|
|
return Ok(());
|
|
}
|
|
|
|
// Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
|
|
//
|
|
// We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
|
|
// will block waiting for the last valid LSN to advance up to
|
|
// it. So we use the previous record's LSN in the get calls
|
|
// instead.
|
|
for segno in modification
|
|
.tline
|
|
.list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
|
|
modification
|
|
.drop_slru_segment(SlruKind::Clog, segno, ctx)
|
|
.await?;
|
|
trace!("Drop CLOG segment {:>04X}", segno);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn ingest_multixact_create_record(
|
|
&mut self,
|
|
modification: &mut DatadirModification,
|
|
xlrec: &XlMultiXactCreate,
|
|
) -> Result<()> {
|
|
// Create WAL record for updating the multixact-offsets page
|
|
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
modification.put_slru_wal_record(
|
|
SlruKind::MultiXactOffsets,
|
|
segno,
|
|
rpageno,
|
|
NeonWalRecord::MultixactOffsetCreate {
|
|
mid: xlrec.mid,
|
|
moff: xlrec.moff,
|
|
},
|
|
)?;
|
|
|
|
// Create WAL records for the update of each affected multixact-members page
|
|
let mut members = xlrec.members.iter();
|
|
let mut offset = xlrec.moff;
|
|
loop {
|
|
let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
|
|
|
// How many members fit on this page?
|
|
let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
|
|
- offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
|
|
|
let mut this_page_members: Vec<MultiXactMember> = Vec::new();
|
|
for _ in 0..page_remain {
|
|
if let Some(m) = members.next() {
|
|
this_page_members.push(m.clone());
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
if this_page_members.is_empty() {
|
|
// all done
|
|
break;
|
|
}
|
|
let n_this_page = this_page_members.len();
|
|
|
|
modification.put_slru_wal_record(
|
|
SlruKind::MultiXactMembers,
|
|
pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
|
|
pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
|
|
NeonWalRecord::MultixactMembersCreate {
|
|
moff: offset,
|
|
members: this_page_members,
|
|
},
|
|
)?;
|
|
|
|
// Note: The multixact members can wrap around, even within one WAL record.
|
|
offset = offset.wrapping_add(n_this_page as u32);
|
|
}
|
|
if xlrec.mid >= self.checkpoint.nextMulti {
|
|
self.checkpoint.nextMulti = xlrec.mid + 1;
|
|
self.checkpoint_modified = true;
|
|
}
|
|
if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset {
|
|
self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
|
|
self.checkpoint_modified = true;
|
|
}
|
|
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
|
|
if let Some(max_xid) = acc {
|
|
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
|
|
Some(mbr.xid)
|
|
} else {
|
|
acc
|
|
}
|
|
} else {
|
|
Some(mbr.xid)
|
|
}
|
|
});
|
|
|
|
if let Some(max_xid) = max_mbr_xid {
|
|
if self.checkpoint.update_next_xid(max_xid) {
|
|
self.checkpoint_modified = true;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_multixact_truncate_record(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
xlrec: &XlMultiXactTruncate,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
|
|
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
|
|
self.checkpoint_modified = true;
|
|
|
|
// PerformMembersTruncation
|
|
let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
|
|
let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
|
|
let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
|
|
let mut segment: i32 = startsegment;
|
|
|
|
// Delete all the segments except the last one. The last segment can still
|
|
// contain, possibly partially, valid data.
|
|
while segment != endsegment {
|
|
modification
|
|
.drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
|
|
.await?;
|
|
|
|
/* move to next segment, handling wraparound correctly */
|
|
if segment == maxsegment {
|
|
segment = 0;
|
|
} else {
|
|
segment += 1;
|
|
}
|
|
}
|
|
|
|
// Truncate offsets
|
|
// FIXME: this did not handle wraparound correctly
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_relmap_page(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
xlrec: &XlRelmapUpdate,
|
|
decoded: &DecodedWALRecord,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
let mut buf = decoded.record.clone();
|
|
buf.advance(decoded.main_data_offset);
|
|
// skip xl_relmap_update
|
|
buf.advance(12);
|
|
|
|
modification
|
|
.put_relmap_file(
|
|
xlrec.tsid,
|
|
xlrec.dbid,
|
|
Bytes::copy_from_slice(&buf[..]),
|
|
ctx,
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn put_rel_creation(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
modification.put_rel_creation(rel, 0, ctx).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_rel_page_image(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
img: Bytes,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), PageReconstructError> {
|
|
self.handle_rel_extend(modification, rel, blknum, ctx)
|
|
.await?;
|
|
modification.put_rel_page_image(rel, blknum, img)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_rel_wal_record(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
rec: NeonWalRecord,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
self.handle_rel_extend(modification, rel, blknum, ctx)
|
|
.await?;
|
|
modification.put_rel_wal_record(rel, blknum, rec)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_rel_truncation(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
nblocks: BlockNumber,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
modification.put_rel_truncation(rel, nblocks, ctx).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_rel_drop(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
modification.put_rel_drop(rel, ctx).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_rel_extend(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), PageReconstructError> {
|
|
let new_nblocks = blknum + 1;
|
|
// Check if the relation exists. We implicitly create relations on first
|
|
// record.
|
|
// TODO: would be nice if to be more explicit about it
|
|
|
|
// Get current size and put rel creation if rel doesn't exist
|
|
//
|
|
// NOTE: we check the cache first even though get_rel_exists and get_rel_size would
|
|
// check the cache too. This is because eagerly checking the cache results in
|
|
// less work overall and 10% better performance. It's more work on cache miss
|
|
// but cache miss is rare.
|
|
let old_nblocks = if let Some(nblocks) = modification
|
|
.tline
|
|
.get_cached_rel_size(&rel, modification.get_lsn())
|
|
{
|
|
nblocks
|
|
} else if !modification
|
|
.tline
|
|
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
// create it with 0 size initially, the logic below will extend it
|
|
modification
|
|
.put_rel_creation(rel, 0, ctx)
|
|
.await
|
|
.context("Relation Error")?;
|
|
0
|
|
} else {
|
|
modification
|
|
.tline
|
|
.get_rel_size(rel, Version::Modified(modification), ctx)
|
|
.await?
|
|
};
|
|
|
|
if new_nblocks > old_nblocks {
|
|
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
|
|
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
|
|
|
|
let mut key = rel_block_to_key(rel, blknum);
|
|
// fill the gap with zeros
|
|
for gap_blknum in old_nblocks..blknum {
|
|
key.field6 = gap_blknum;
|
|
|
|
if self.shard.get_shard_number(&key) != self.shard.number {
|
|
continue;
|
|
}
|
|
|
|
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_slru_page_image(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
blknum: BlockNumber,
|
|
img: Bytes,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
self.handle_slru_extend(modification, kind, segno, blknum, ctx)
|
|
.await?;
|
|
modification.put_slru_page_image(kind, segno, blknum, img)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_slru_extend(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
blknum: BlockNumber,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
// we don't use a cache for this like we do for relations. SLRUS are explcitly
|
|
// extended with ZEROPAGE records, not with commit records, so it happens
|
|
// a lot less frequently.
|
|
|
|
let new_nblocks = blknum + 1;
|
|
// Check if the relation exists. We implicitly create relations on first
|
|
// record.
|
|
// TODO: would be nice if to be more explicit about it
|
|
let old_nblocks = if !modification
|
|
.tline
|
|
.get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
// create it with 0 size initially, the logic below will extend it
|
|
modification
|
|
.put_slru_segment_creation(kind, segno, 0, ctx)
|
|
.await?;
|
|
0
|
|
} else {
|
|
modification
|
|
.tline
|
|
.get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
|
|
.await?
|
|
};
|
|
|
|
if new_nblocks > old_nblocks {
|
|
trace!(
|
|
"extending SLRU {:?} seg {} from {} to {} blocks",
|
|
kind,
|
|
segno,
|
|
old_nblocks,
|
|
new_nblocks
|
|
);
|
|
modification.put_slru_extend(kind, segno, new_nblocks)?;
|
|
|
|
// fill the gap with zeros
|
|
for gap_blknum in old_nblocks..blknum {
|
|
modification.put_slru_page_image(kind, segno, gap_blknum, ZERO_PAGE.clone())?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn get_relsize(
|
|
modification: &DatadirModification<'_>,
|
|
rel: RelTag,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<BlockNumber> {
|
|
let nblocks = if !modification
|
|
.tline
|
|
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
0
|
|
} else {
|
|
modification
|
|
.tline
|
|
.get_rel_size(rel, Version::Modified(modification), ctx)
|
|
.await?
|
|
};
|
|
Ok(nblocks)
|
|
}
|
|
|
|
#[allow(clippy::bool_assert_comparison)]
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::tenant::harness::*;
|
|
use crate::tenant::remote_timeline_client::{remote_initdb_archive_path, INITDB_PATH};
|
|
use postgres_ffi::RELSEG_SIZE;
|
|
|
|
use crate::DEFAULT_PG_VERSION;
|
|
|
|
/// Arbitrary relation tag, for testing.
|
|
const TESTREL_A: RelTag = RelTag {
|
|
spcnode: 0,
|
|
dbnode: 111,
|
|
relnode: 1000,
|
|
forknum: 0,
|
|
};
|
|
|
|
fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
|
|
// TODO
|
|
}
|
|
|
|
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
|
|
|
|
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
|
|
let mut m = tline.begin_modification(Lsn(0x10));
|
|
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
|
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
|
m.commit(ctx).await?;
|
|
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
|
|
|
Ok(walingest)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_relsize() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
let mut m = tline.begin_modification(Lsn(0x20));
|
|
walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
let mut m = tline.begin_modification(Lsn(0x30));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
let mut m = tline.begin_modification(Lsn(0x40));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
let mut m = tline.begin_modification(Lsn(0x50));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
assert_current_logical_size(&tline, Lsn(0x50));
|
|
|
|
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
|
.await?,
|
|
false
|
|
);
|
|
assert!(tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
|
.await
|
|
.is_err());
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
3
|
|
);
|
|
|
|
// Check page contents at each LSN
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 0 at 2")
|
|
);
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 1 at 4")
|
|
);
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 1 at 4")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 2 at 5")
|
|
);
|
|
|
|
// Truncate last block
|
|
let mut m = tline.begin_modification(Lsn(0x60));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_current_logical_size(&tline, Lsn(0x60));
|
|
|
|
// Check reported size and contents after truncation
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
|
.await?,
|
|
2
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 1 at 4")
|
|
);
|
|
|
|
// should still see the truncated block with older LSN
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
3
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 2 at 5")
|
|
);
|
|
|
|
// Truncate to zero length
|
|
let mut m = tline.begin_modification(Lsn(0x68));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
|
|
.await?,
|
|
0
|
|
);
|
|
|
|
// Extend from 0 to 2 blocks, leaving a gap
|
|
let mut m = tline.begin_modification(Lsn(0x70));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
|
|
.await?,
|
|
2
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
|
|
.await?,
|
|
ZERO_PAGE
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 1")
|
|
);
|
|
|
|
// Extend a lot more, leaving a big gap that spans across segments
|
|
let mut m = tline.begin_modification(Lsn(0x80));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
1501
|
|
);
|
|
for blk in 2..1500 {
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
ZERO_PAGE
|
|
);
|
|
}
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
test_img("foo blk 1500")
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Test what happens if we dropped a relation
|
|
// and then created it again within the same layer.
|
|
#[tokio::test]
|
|
async fn test_drop_extend() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
let mut m = tline.begin_modification(Lsn(0x20));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check that rel exists and size is correct
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
|
|
// Drop rel
|
|
let mut m = tline.begin_modification(Lsn(0x30));
|
|
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check that rel is not visible anymore
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
|
|
.await?,
|
|
false
|
|
);
|
|
|
|
// FIXME: should fail
|
|
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
|
|
|
|
// Re-create it
|
|
let mut m = tline.begin_modification(Lsn(0x40));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check that rel exists and size is correct
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Test what happens if we truncated a relation
|
|
// so that one of its segments was dropped
|
|
// and then extended it again within the same layer.
|
|
#[tokio::test]
|
|
async fn test_truncate_extend() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
// Create a 20 MB relation (the size is arbitrary)
|
|
let relsize = 20 * 1024 * 1024 / 8192;
|
|
let mut m = tline.begin_modification(Lsn(0x20));
|
|
for blkno in 0..relsize {
|
|
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
|
|
.await?;
|
|
}
|
|
m.commit(&ctx).await?;
|
|
|
|
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
|
.await?,
|
|
false
|
|
);
|
|
assert!(tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
|
.await
|
|
.is_err());
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
relsize
|
|
);
|
|
|
|
// Check relation content
|
|
for blkno in 0..relsize {
|
|
let lsn = Lsn(0x20);
|
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
// Truncate relation so that second segment was dropped
|
|
// - only leave one page
|
|
let mut m = tline.begin_modification(Lsn(0x60));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check reported size and contents after truncation
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
|
|
for blkno in 0..1 {
|
|
let lsn = Lsn(0x20);
|
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
// should still see all blocks with older LSN
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
relsize
|
|
);
|
|
for blkno in 0..relsize {
|
|
let lsn = Lsn(0x20);
|
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
// Extend relation again.
|
|
// Add enough blocks to create second segment
|
|
let lsn = Lsn(0x80);
|
|
let mut m = tline.begin_modification(lsn);
|
|
for blkno in 0..relsize {
|
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
|
|
.await?;
|
|
}
|
|
m.commit(&ctx).await?;
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
relsize
|
|
);
|
|
// Check relation content
|
|
for blkno in 0..relsize {
|
|
let lsn = Lsn(0x80);
|
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
|
|
/// split into multiple 1 GB segments in Postgres.
|
|
#[tokio::test]
|
|
async fn test_large_rel() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
let mut lsn = 0x10;
|
|
for blknum in 0..RELSEG_SIZE + 1 {
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
}
|
|
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
RELSEG_SIZE + 1
|
|
);
|
|
|
|
// Truncate one block
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
RELSEG_SIZE
|
|
);
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
// Truncate another block
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
RELSEG_SIZE - 1
|
|
);
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
|
|
// This tests the behavior at segment boundaries
|
|
let mut size: i32 = 3000;
|
|
while size >= 0 {
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
size as BlockNumber
|
|
);
|
|
|
|
size -= 1;
|
|
}
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Replay a wal segment file taken directly from safekeepers.
|
|
///
|
|
/// This test is useful for benchmarking since it allows us to profile only
|
|
/// the walingest code in a single-threaded executor, and iterate more quickly
|
|
/// without waiting for unrelated steps.
|
|
#[tokio::test]
|
|
async fn test_ingest_real_wal() {
|
|
use crate::tenant::harness::*;
|
|
use postgres_ffi::waldecoder::WalStreamDecoder;
|
|
use postgres_ffi::WAL_SEGMENT_SIZE;
|
|
|
|
// Define test data path and constants.
|
|
//
|
|
// Steps to reconstruct the data, if needed:
|
|
// 1. Run the pgbench python test
|
|
// 2. Take the first wal segment file from safekeeper
|
|
// 3. Compress it using `zstd --long input_file`
|
|
// 4. Copy initdb.tar.zst from local_fs_remote_storage
|
|
// 5. Grep sk logs for "restart decoder" to get startpoint
|
|
// 6. Run just the decoder from this test to get the endpoint.
|
|
// It's the last LSN the decoder will output.
|
|
let pg_version = 15; // The test data was generated by pg15
|
|
let path = "test_data/sk_wal_segment_from_pgbench";
|
|
let wal_segment_path = format!("{path}/000000010000000000000001.zst");
|
|
let source_initdb_path = format!("{path}/{INITDB_PATH}");
|
|
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
|
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
|
|
|
let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
|
|
let (tenant, ctx) = harness.load().await;
|
|
|
|
let remote_initdb_path =
|
|
remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
|
|
let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
|
|
|
|
std::fs::create_dir_all(initdb_path.parent().unwrap())
|
|
.expect("creating test dir should work");
|
|
std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
|
|
|
|
// Bootstrap a real timeline. We can't use create_test_timeline because
|
|
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
|
// the garbage data.
|
|
let tline = tenant
|
|
.bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
|
|
.await
|
|
.unwrap();
|
|
|
|
// We fully read and decompress this into memory before decoding
|
|
// to get a more accurate perf profile of the decoder.
|
|
let bytes = {
|
|
use async_compression::tokio::bufread::ZstdDecoder;
|
|
let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
|
|
let reader = tokio::io::BufReader::new(file);
|
|
let decoder = ZstdDecoder::new(reader);
|
|
let mut reader = tokio::io::BufReader::new(decoder);
|
|
let mut buffer = Vec::new();
|
|
tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
|
|
buffer
|
|
};
|
|
|
|
// TODO start a profiler too
|
|
let started_at = std::time::Instant::now();
|
|
|
|
// Initialize walingest
|
|
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
|
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
|
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
|
.await
|
|
.unwrap();
|
|
let mut modification = tline.begin_modification(startpoint);
|
|
let mut decoded = DecodedWALRecord::default();
|
|
println!("decoding {} bytes", bytes.len() - xlogoff);
|
|
|
|
// Decode and ingest wal. We process the wal in chunks because
|
|
// that's what happens when we get bytes from safekeepers.
|
|
for chunk in bytes[xlogoff..].chunks(50) {
|
|
decoder.feed_bytes(chunk);
|
|
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
|
walingest
|
|
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
modification.commit(&ctx).await.unwrap();
|
|
}
|
|
|
|
let duration = started_at.elapsed();
|
|
println!("done in {:?}", duration);
|
|
}
|
|
}
|