mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
We don't want to depend on postgres_ffi in an API crate. If there is no such dependency, we can compile stuff like `storcon_cli` without needing a full working postgres build. Fixes regression of #12548 (before we could compile it).
2435 lines
88 KiB
Rust
2435 lines
88 KiB
Rust
//!
|
|
//! Parse PostgreSQL WAL records and store them in a neon Timeline.
|
|
//!
|
|
//! The pipeline for ingesting WAL looks like this:
|
|
//!
|
|
//! WAL receiver -> [`wal_decoder`] -> WalIngest -> Repository
|
|
//!
|
|
//! The WAL receiver receives a stream of WAL from the WAL safekeepers.
|
|
//! Records get decoded and interpreted in the [`wal_decoder`] module
|
|
//! and then stored to the Repository by WalIngest.
|
|
//!
|
|
//! The neon Repository can store page versions in two formats: as
|
|
//! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`]
|
|
//! extracts page images out of some WAL records, but mostly it's WAL
|
|
//! records. If a WAL record modifies multiple pages, WalIngest
|
|
//! will call Repository::put_rel_wal_record or put_rel_page_image functions
|
|
//! separately for each modified page.
|
|
//!
|
|
//! To reconstruct a page using a WAL record, the Repository calls the
|
|
//! code in walredo.rs. walredo.rs passes most WAL records to the WAL
|
|
//! redo Postgres process, but some records it can handle directly with
|
|
//! bespoken Rust code.
|
|
|
|
use std::backtrace::Backtrace;
|
|
use std::collections::HashMap;
|
|
use std::sync::{Arc, OnceLock};
|
|
use std::time::{Duration, Instant, SystemTime};
|
|
|
|
use bytes::{Buf, Bytes};
|
|
use pageserver_api::key::{Key, rel_block_to_key};
|
|
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
|
use pageserver_api::shard::ShardIdentity;
|
|
use postgres_ffi::walrecord::*;
|
|
use postgres_ffi::{
|
|
PgMajorVersion, TransactionId, dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch,
|
|
fsm_logical_to_physical, pg_constants,
|
|
};
|
|
use postgres_ffi_types::TimestampTz;
|
|
use postgres_ffi_types::forknum::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
|
use tracing::*;
|
|
use utils::bin_ser::{DeserializeError, SerializeError};
|
|
use utils::lsn::Lsn;
|
|
use utils::rate_limit::RateLimit;
|
|
use utils::{critical_timeline, failpoint_support};
|
|
use wal_decoder::models::record::NeonWalRecord;
|
|
use wal_decoder::models::*;
|
|
|
|
use crate::ZERO_PAGE;
|
|
use crate::context::RequestContext;
|
|
use crate::metrics::WAL_INGEST;
|
|
use crate::pgdatadir_mapping::{DatadirModification, Version};
|
|
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
|
use crate::tenant::{PageReconstructError, Timeline};
|
|
|
|
enum_pgversion! {CheckPoint, pgv::CheckPoint}
|
|
|
|
impl CheckPoint {
|
|
fn encode(&self) -> Result<Bytes, SerializeError> {
|
|
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
|
|
}
|
|
|
|
fn update_next_xid(&mut self, xid: u32) -> bool {
|
|
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
|
|
}
|
|
|
|
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
|
|
enum_pgversion_dispatch!(self, CheckPoint, cp, {
|
|
cp.update_next_multixid(multi_xid, multi_offset)
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Temporary limitation of WAL lag warnings after attach
|
|
///
|
|
/// After tenant attach, we want to limit WAL lag warnings because
|
|
/// we don't look at the WAL until the attach is complete, which
|
|
/// might take a while.
|
|
pub struct WalLagCooldown {
|
|
/// Until when should this limitation apply at all
|
|
active_until: std::time::Instant,
|
|
/// The maximum lag to suppress. Lags above this limit get reported anyways.
|
|
max_lag: Duration,
|
|
}
|
|
|
|
impl WalLagCooldown {
|
|
pub fn new(attach_start: Instant, attach_duration: Duration) -> Self {
|
|
Self {
|
|
active_until: attach_start + attach_duration * 3 + Duration::from_secs(120),
|
|
max_lag: attach_duration * 2 + Duration::from_secs(60),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct WalIngest {
|
|
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
|
|
shard: ShardIdentity,
|
|
checkpoint: CheckPoint,
|
|
checkpoint_modified: bool,
|
|
warn_ingest_lag: WarnIngestLag,
|
|
}
|
|
|
|
struct WarnIngestLag {
|
|
lag_msg_ratelimit: RateLimit,
|
|
future_lsn_msg_ratelimit: RateLimit,
|
|
timestamp_invalid_msg_ratelimit: RateLimit,
|
|
}
|
|
|
|
pub struct WalIngestError {
|
|
pub backtrace: std::backtrace::Backtrace,
|
|
pub kind: WalIngestErrorKind,
|
|
}
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum WalIngestErrorKind {
|
|
#[error(transparent)]
|
|
#[allow(private_interfaces)]
|
|
PageReconstructError(#[from] PageReconstructError),
|
|
#[error(transparent)]
|
|
DeserializationFailure(#[from] DeserializeError),
|
|
#[error(transparent)]
|
|
SerializationFailure(#[from] SerializeError),
|
|
#[error("the request contains data not supported by pageserver: {0} @ {1}")]
|
|
InvalidKey(Key, Lsn),
|
|
#[error("twophase file for xid {0} already exists")]
|
|
FileAlreadyExists(u64),
|
|
#[error("slru segment {0:?}/{1} already exists")]
|
|
SlruAlreadyExists(SlruKind, u32),
|
|
#[error("relation already exists")]
|
|
RelationAlreadyExists(RelTag),
|
|
#[error("invalid reldir key {0}")]
|
|
InvalidRelDirKey(Key),
|
|
|
|
#[error(transparent)]
|
|
LogicalError(anyhow::Error),
|
|
#[error(transparent)]
|
|
EncodeAuxFileError(anyhow::Error),
|
|
#[error(transparent)]
|
|
MaybeRelSizeV2Error(anyhow::Error),
|
|
|
|
#[error("timeline shutting down")]
|
|
Cancelled,
|
|
}
|
|
|
|
impl<T> From<T> for WalIngestError
|
|
where
|
|
WalIngestErrorKind: From<T>,
|
|
{
|
|
fn from(value: T) -> Self {
|
|
WalIngestError {
|
|
backtrace: Backtrace::capture(),
|
|
kind: WalIngestErrorKind::from(value),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for WalIngestError {
|
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
|
self.kind.source()
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Display for WalIngestError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
self.kind.fmt(f)
|
|
}
|
|
}
|
|
|
|
impl core::fmt::Debug for WalIngestError {
|
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
|
if f.alternate() {
|
|
f.debug_map()
|
|
.key(&"backtrace")
|
|
.value(&self.backtrace)
|
|
.key(&"kind")
|
|
.value(&self.kind)
|
|
.finish()
|
|
} else {
|
|
writeln!(f, "Error: {:?}", self.kind)?;
|
|
if self.backtrace.status() == std::backtrace::BacktraceStatus::Captured {
|
|
writeln!(f, "Stack backtrace: {:?}", self.backtrace)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|
|
|
|
#[macro_export]
|
|
macro_rules! ensure_walingest {
|
|
($($t:tt)*) => {
|
|
_ = || -> Result<(), anyhow::Error> {
|
|
anyhow::ensure!($($t)*);
|
|
Ok(())
|
|
}().map_err(WalIngestErrorKind::LogicalError)?;
|
|
};
|
|
}
|
|
|
|
impl WalIngest {
|
|
pub async fn new(
|
|
timeline: &Timeline,
|
|
startpoint: Lsn,
|
|
ctx: &RequestContext,
|
|
) -> Result<WalIngest, WalIngestError> {
|
|
// Fetch the latest checkpoint into memory, so that we can compare with it
|
|
// quickly in `ingest_record` and update it when it changes.
|
|
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
|
|
let pgversion = timeline.pg_version;
|
|
|
|
let checkpoint = dispatch_pgversion!(pgversion, {
|
|
let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
|
|
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
|
<pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
|
|
});
|
|
|
|
Ok(WalIngest {
|
|
shard: *timeline.get_shard_identity(),
|
|
checkpoint,
|
|
checkpoint_modified: false,
|
|
attach_wal_lag_cooldown: timeline.attach_wal_lag_cooldown.clone(),
|
|
warn_ingest_lag: WarnIngestLag {
|
|
lag_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
|
|
future_lsn_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
|
|
timestamp_invalid_msg_ratelimit: RateLimit::new(std::time::Duration::from_secs(10)),
|
|
},
|
|
})
|
|
}
|
|
|
|
/// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value
|
|
/// storage of a given timeline.
|
|
///
|
|
/// This function updates `lsn` field of `DatadirModification`
|
|
///
|
|
/// This function returns `true` if the record was ingested, and `false` if it was filtered out
|
|
pub async fn ingest_record(
|
|
&mut self,
|
|
interpreted: InterpretedWalRecord,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<bool, WalIngestError> {
|
|
WAL_INGEST.records_received.inc();
|
|
let prev_len = modification.len();
|
|
|
|
modification.set_lsn(interpreted.next_record_lsn)?;
|
|
|
|
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
|
|
// Records of this type should always be preceded by a commit(), as they
|
|
// rely on reading data pages back from the Timeline.
|
|
assert!(!modification.has_dirty_data());
|
|
}
|
|
|
|
assert!(!self.checkpoint_modified);
|
|
if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID
|
|
&& self.checkpoint.update_next_xid(interpreted.xid)
|
|
{
|
|
self.checkpoint_modified = true;
|
|
}
|
|
|
|
failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
|
|
|
|
match interpreted.metadata_record {
|
|
Some(MetadataRecord::Heapam(rec)) => match rec {
|
|
HeapamRecord::ClearVmBits(clear_vm_bits) => {
|
|
self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
|
|
.await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::Neonrmgr(rec)) => match rec {
|
|
NeonrmgrRecord::ClearVmBits(clear_vm_bits) => {
|
|
self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx)
|
|
.await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::Smgr(rec)) => match rec {
|
|
SmgrRecord::Create(create) => {
|
|
self.ingest_xlog_smgr_create(create, modification, ctx)
|
|
.await?;
|
|
}
|
|
SmgrRecord::Truncate(truncate) => {
|
|
self.ingest_xlog_smgr_truncate(truncate, modification, ctx)
|
|
.await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::Dbase(rec)) => match rec {
|
|
DbaseRecord::Create(create) => {
|
|
self.ingest_xlog_dbase_create(create, modification, ctx)
|
|
.await?;
|
|
}
|
|
DbaseRecord::Drop(drop) => {
|
|
self.ingest_xlog_dbase_drop(drop, modification, ctx).await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::Clog(rec)) => match rec {
|
|
ClogRecord::ZeroPage(zero_page) => {
|
|
self.ingest_clog_zero_page(zero_page, modification, ctx)
|
|
.await?;
|
|
}
|
|
ClogRecord::Truncate(truncate) => {
|
|
self.ingest_clog_truncate(truncate, modification, ctx)
|
|
.await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::Xact(rec)) => {
|
|
self.ingest_xact_record(rec, modification, ctx).await?;
|
|
}
|
|
Some(MetadataRecord::MultiXact(rec)) => match rec {
|
|
MultiXactRecord::ZeroPage(zero_page) => {
|
|
self.ingest_multixact_zero_page(zero_page, modification, ctx)
|
|
.await?;
|
|
}
|
|
MultiXactRecord::Create(create) => {
|
|
self.ingest_multixact_create(modification, &create)?;
|
|
}
|
|
MultiXactRecord::Truncate(truncate) => {
|
|
self.ingest_multixact_truncate(modification, &truncate, ctx)
|
|
.await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::Relmap(rec)) => match rec {
|
|
RelmapRecord::Update(update) => {
|
|
self.ingest_relmap_update(update, modification, ctx).await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::Xlog(rec)) => match rec {
|
|
XlogRecord::Raw(raw) => {
|
|
self.ingest_raw_xlog_record(raw, modification, ctx).await?;
|
|
}
|
|
},
|
|
Some(MetadataRecord::LogicalMessage(rec)) => match rec {
|
|
LogicalMessageRecord::Put(put) => {
|
|
self.ingest_logical_message_put(put, modification, ctx)
|
|
.await?;
|
|
}
|
|
#[cfg(feature = "testing")]
|
|
LogicalMessageRecord::Failpoint => {
|
|
// This is a convenient way to make the WAL ingestion pause at
|
|
// particular point in the WAL. For more fine-grained control,
|
|
// we could peek into the message and only pause if it contains
|
|
// a particular string, for example, but this is enough for now.
|
|
failpoint_support::sleep_millis_async!(
|
|
"pageserver-wal-ingest-logical-message-sleep"
|
|
);
|
|
}
|
|
},
|
|
Some(MetadataRecord::Standby(rec)) => {
|
|
self.ingest_standby_record(rec).unwrap();
|
|
}
|
|
Some(MetadataRecord::Replorigin(rec)) => {
|
|
self.ingest_replorigin_record(rec, modification).await?;
|
|
}
|
|
None => {
|
|
// There are two cases through which we end up here:
|
|
// 1. The resource manager for the original PG WAL record
|
|
// is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported
|
|
// record type within Neon.
|
|
// 2. The resource manager id was unknown to
|
|
// [`wal_decoder::decoder::MetadataRecord::from_decoded`].
|
|
// TODO(vlad): Tighten this up more once we build confidence
|
|
// that case (2) does not happen in the field.
|
|
}
|
|
}
|
|
|
|
modification
|
|
.ingest_batch(interpreted.batch, &self.shard, ctx)
|
|
.await?;
|
|
|
|
// If checkpoint data was updated, store the new version in the repository
|
|
if self.checkpoint_modified {
|
|
let new_checkpoint_bytes = self.checkpoint.encode()?;
|
|
|
|
modification.put_checkpoint(new_checkpoint_bytes)?;
|
|
self.checkpoint_modified = false;
|
|
}
|
|
|
|
// Note that at this point this record is only cached in the modification
|
|
// until commit() is called to flush the data into the repository and update
|
|
// the latest LSN.
|
|
|
|
Ok(modification.len() > prev_len)
|
|
}
|
|
|
|
/// This is the same as AdjustToFullTransactionId(xid) in PostgreSQL
|
|
fn adjust_to_full_transaction_id(&self, xid: TransactionId) -> Result<u64, WalIngestError> {
|
|
let next_full_xid =
|
|
enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, cp, { cp.nextXid.value });
|
|
|
|
let next_xid = (next_full_xid) as u32;
|
|
let mut epoch = (next_full_xid >> 32) as u32;
|
|
|
|
if xid > next_xid {
|
|
// Wraparound occurred, must be from a prev epoch.
|
|
if epoch == 0 {
|
|
Err(WalIngestErrorKind::LogicalError(anyhow::anyhow!(
|
|
"apparent XID wraparound with prepared transaction XID {xid}, nextXid is {next_full_xid}"
|
|
)))?;
|
|
}
|
|
epoch -= 1;
|
|
}
|
|
|
|
Ok(((epoch as u64) << 32) | xid as u64)
|
|
}
|
|
|
|
async fn ingest_clear_vm_bits(
|
|
&mut self,
|
|
clear_vm_bits: ClearVmBits,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let ClearVmBits {
|
|
new_heap_blkno,
|
|
old_heap_blkno,
|
|
flags,
|
|
vm_rel,
|
|
} = clear_vm_bits;
|
|
// Clear the VM bits if required.
|
|
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
|
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
|
|
|
// VM bits can only be cleared on the shard(s) owning the VM relation, and must be within
|
|
// its view of the VM relation size. Out of caution, error instead of failing WAL ingestion,
|
|
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
|
|
// https://github.com/neondatabase/neon/pull/10634.
|
|
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
|
|
critical_timeline!(
|
|
modification.tline.tenant_shard_id,
|
|
modification.tline.timeline_id,
|
|
"clear_vm_bits for unknown VM relation {vm_rel}"
|
|
);
|
|
return Ok(());
|
|
};
|
|
if let Some(blknum) = new_vm_blk {
|
|
if blknum >= vm_size {
|
|
critical_timeline!(
|
|
modification.tline.tenant_shard_id,
|
|
modification.tline.timeline_id,
|
|
"new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
|
);
|
|
new_vm_blk = None;
|
|
}
|
|
}
|
|
if let Some(blknum) = old_vm_blk {
|
|
if blknum >= vm_size {
|
|
critical_timeline!(
|
|
modification.tline.tenant_shard_id,
|
|
modification.tline.timeline_id,
|
|
"old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
|
);
|
|
old_vm_blk = None;
|
|
}
|
|
}
|
|
|
|
if new_vm_blk.is_none() && old_vm_blk.is_none() {
|
|
return Ok(());
|
|
} else if new_vm_blk == old_vm_blk {
|
|
// An UPDATE record that needs to clear the bits for both old and the new page, both of
|
|
// which reside on the same VM page.
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
new_vm_blk.unwrap(),
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno,
|
|
old_heap_blkno,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
} else {
|
|
// Clear VM bits for one heap page, or for two pages that reside on different VM pages.
|
|
if let Some(new_vm_blk) = new_vm_blk {
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
new_vm_blk,
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno,
|
|
old_heap_blkno: None,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
}
|
|
if let Some(old_vm_blk) = old_vm_blk {
|
|
self.put_rel_wal_record(
|
|
modification,
|
|
vm_rel,
|
|
old_vm_blk,
|
|
NeonWalRecord::ClearVisibilityMapFlags {
|
|
new_heap_blkno: None,
|
|
old_heap_blkno,
|
|
flags,
|
|
},
|
|
ctx,
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
|
|
async fn ingest_xlog_dbase_create(
|
|
&mut self,
|
|
create: DbaseCreate,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let DbaseCreate {
|
|
db_id,
|
|
tablespace_id,
|
|
src_db_id,
|
|
src_tablespace_id,
|
|
} = create;
|
|
|
|
let rels = modification
|
|
.tline
|
|
.list_rels(
|
|
src_tablespace_id,
|
|
src_db_id,
|
|
Version::Modified(modification),
|
|
ctx,
|
|
)
|
|
.await?;
|
|
|
|
debug!("ingest_xlog_dbase_create: {} rels", rels.len());
|
|
|
|
// Copy relfilemap
|
|
let filemap = modification
|
|
.tline
|
|
.get_relmap_file(
|
|
src_tablespace_id,
|
|
src_db_id,
|
|
Version::Modified(modification),
|
|
ctx,
|
|
)
|
|
.await?;
|
|
modification
|
|
.put_relmap_file(tablespace_id, db_id, filemap, ctx)
|
|
.await?;
|
|
|
|
let mut num_rels_copied = 0;
|
|
let mut num_blocks_copied = 0;
|
|
for src_rel in rels {
|
|
assert_eq!(src_rel.spcnode, src_tablespace_id);
|
|
assert_eq!(src_rel.dbnode, src_db_id);
|
|
|
|
let nblocks = modification
|
|
.tline
|
|
.get_rel_size(src_rel, Version::Modified(modification), ctx)
|
|
.await?;
|
|
let dst_rel = RelTag {
|
|
spcnode: tablespace_id,
|
|
dbnode: db_id,
|
|
relnode: src_rel.relnode,
|
|
forknum: src_rel.forknum,
|
|
};
|
|
|
|
modification.put_rel_creation(dst_rel, nblocks, ctx).await?;
|
|
|
|
// Copy content
|
|
debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
|
|
for blknum in 0..nblocks {
|
|
// Sharding:
|
|
// - src and dst are always on the same shard, because they differ only by dbNode, and
|
|
// dbNode is not included in the hash inputs for sharding.
|
|
// - This WAL command is replayed on all shards, but each shard only copies the blocks
|
|
// that belong to it.
|
|
let src_key = rel_block_to_key(src_rel, blknum);
|
|
if !self.shard.is_key_local(&src_key) {
|
|
debug!(
|
|
"Skipping non-local key {} during XLOG_DBASE_CREATE",
|
|
src_key
|
|
);
|
|
continue;
|
|
}
|
|
debug!(
|
|
"copying block {} from {} ({}) to {}",
|
|
blknum, src_rel, src_key, dst_rel
|
|
);
|
|
|
|
let content = modification
|
|
.tline
|
|
.get_rel_page_at_lsn(
|
|
src_rel,
|
|
blknum,
|
|
Version::Modified(modification),
|
|
ctx,
|
|
crate::tenant::storage_layer::IoConcurrency::sequential(),
|
|
)
|
|
.await?;
|
|
modification.put_rel_page_image(dst_rel, blknum, content)?;
|
|
num_blocks_copied += 1;
|
|
}
|
|
|
|
num_rels_copied += 1;
|
|
}
|
|
|
|
info!(
|
|
"Created database {}/{}, copied {} blocks in {} rels",
|
|
tablespace_id, db_id, num_blocks_copied, num_rels_copied
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_xlog_dbase_drop(
|
|
&mut self,
|
|
dbase_drop: DbaseDrop,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let DbaseDrop {
|
|
db_id,
|
|
tablespace_ids,
|
|
} = dbase_drop;
|
|
for tablespace_id in tablespace_ids {
|
|
trace!("Drop db {}, {}", tablespace_id, db_id);
|
|
modification.drop_dbdir(tablespace_id, db_id, ctx).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_xlog_smgr_create(
|
|
&mut self,
|
|
create: SmgrCreate,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let SmgrCreate { rel } = create;
|
|
self.put_rel_creation(modification, rel, ctx).await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record.
|
|
///
|
|
/// This is the same logic as in PostgreSQL's smgr_redo() function.
|
|
async fn ingest_xlog_smgr_truncate(
|
|
&mut self,
|
|
truncate: XlSmgrTruncate,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let XlSmgrTruncate {
|
|
blkno,
|
|
rnode,
|
|
flags,
|
|
} = truncate;
|
|
|
|
let spcnode = rnode.spcnode;
|
|
let dbnode = rnode.dbnode;
|
|
let relnode = rnode.relnode;
|
|
|
|
if flags & pg_constants::SMGR_TRUNCATE_HEAP != 0 {
|
|
let rel = RelTag {
|
|
spcnode,
|
|
dbnode,
|
|
relnode,
|
|
forknum: MAIN_FORKNUM,
|
|
};
|
|
|
|
self.put_rel_truncation(modification, rel, blkno, ctx)
|
|
.await?;
|
|
}
|
|
if flags & pg_constants::SMGR_TRUNCATE_FSM != 0 {
|
|
let rel = RelTag {
|
|
spcnode,
|
|
dbnode,
|
|
relnode,
|
|
forknum: FSM_FORKNUM,
|
|
};
|
|
|
|
// Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
|
|
// and instead of digging in the FSM bitmap format we just clear the whole page.
|
|
let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
|
|
let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
|
|
if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
|
|
&& self
|
|
.shard
|
|
.is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
|
|
{
|
|
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
|
fsm_physical_page_no += 1;
|
|
}
|
|
// Truncate this shard's view of the FSM relation size, if it even has one.
|
|
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
|
|
if nblocks > fsm_physical_page_no {
|
|
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
if flags & pg_constants::SMGR_TRUNCATE_VM != 0 {
|
|
let rel = RelTag {
|
|
spcnode,
|
|
dbnode,
|
|
relnode,
|
|
forknum: VISIBILITYMAP_FORKNUM,
|
|
};
|
|
|
|
// last remaining block, byte, and bit
|
|
let mut vm_page_no = blkno / (pg_constants::VM_HEAPBLOCKS_PER_PAGE as u32);
|
|
let trunc_byte = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_PAGE
|
|
/ pg_constants::VM_HEAPBLOCKS_PER_BYTE;
|
|
let trunc_offs = blkno as usize % pg_constants::VM_HEAPBLOCKS_PER_BYTE
|
|
* pg_constants::VM_BITS_PER_HEAPBLOCK;
|
|
|
|
// Unless the new size is exactly at a visibility map page boundary, the
|
|
// tail bits in the last remaining map page, representing truncated heap
|
|
// blocks, need to be cleared. This is not only tidy, but also necessary
|
|
// because we don't get a chance to clear the bits if the heap is extended
|
|
// again. Only do this on the shard that owns the page.
|
|
if (trunc_byte != 0 || trunc_offs != 0)
|
|
&& self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
|
|
{
|
|
modification.put_rel_wal_record(
|
|
rel,
|
|
vm_page_no,
|
|
NeonWalRecord::TruncateVisibilityMap {
|
|
trunc_byte,
|
|
trunc_offs,
|
|
},
|
|
)?;
|
|
vm_page_no += 1;
|
|
}
|
|
// Truncate this shard's view of the VM relation size, if it even has one.
|
|
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
|
|
if nblocks > vm_page_no {
|
|
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
|
.await?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn warn_on_ingest_lag(
|
|
&mut self,
|
|
conf: &crate::config::PageServerConf,
|
|
wal_timestamp: TimestampTz,
|
|
) {
|
|
debug_assert_current_span_has_tenant_and_timeline_id();
|
|
let now = SystemTime::now();
|
|
let rate_limits = &mut self.warn_ingest_lag;
|
|
|
|
let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
|
|
pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
|
|
});
|
|
|
|
match ts {
|
|
Ok(ts) => {
|
|
match now.duration_since(ts) {
|
|
Ok(lag) => {
|
|
if lag > conf.wait_lsn_timeout {
|
|
rate_limits.lag_msg_ratelimit.call2(|rate_limit_stats| {
|
|
if let Some(cooldown) = self.attach_wal_lag_cooldown.get() {
|
|
if std::time::Instant::now() < cooldown.active_until && lag <= cooldown.max_lag {
|
|
return;
|
|
}
|
|
} else {
|
|
// Still loading? We shouldn't be here
|
|
}
|
|
let lag = humantime::format_duration(lag);
|
|
warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
|
|
})
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let delta_t = e.duration();
|
|
// determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
|
|
// => https://www.robustperception.io/time-metric-from-the-node-exporter/
|
|
const IGNORED_DRIFT: Duration = Duration::from_millis(100);
|
|
if delta_t > IGNORED_DRIFT {
|
|
let delta_t = humantime::format_duration(delta_t);
|
|
rate_limits.future_lsn_msg_ratelimit.call2(|rate_limit_stats| {
|
|
warn!(%rate_limit_stats, %delta_t, "ingesting record with timestamp from future");
|
|
})
|
|
}
|
|
}
|
|
};
|
|
}
|
|
Err(error) => {
|
|
rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
|
|
warn!(%rate_limit_stats, %error, "ingesting record with invalid timestamp, cannot calculate lag and will fail find-lsn-for-timestamp type queries");
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Subroutine of ingest_record(), to handle an XLOG_XACT_* records.
|
|
///
|
|
async fn ingest_xact_record(
|
|
&mut self,
|
|
record: XactRecord,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let (xact_common, is_commit, is_prepared) = match record {
|
|
XactRecord::Prepare(XactPrepare { xl_xid, data }) => {
|
|
let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
|
|
self.adjust_to_full_transaction_id(xl_xid)?
|
|
} else {
|
|
xl_xid as u64
|
|
};
|
|
return modification.put_twophase_file(xid, data, ctx).await;
|
|
}
|
|
XactRecord::Commit(common) => (common, true, false),
|
|
XactRecord::Abort(common) => (common, false, false),
|
|
XactRecord::CommitPrepared(common) => (common, true, true),
|
|
XactRecord::AbortPrepared(common) => (common, false, true),
|
|
};
|
|
|
|
let XactCommon {
|
|
parsed,
|
|
origin_id,
|
|
xl_xid,
|
|
lsn,
|
|
} = xact_common;
|
|
|
|
// Record update of CLOG pages
|
|
let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let mut page_xids: Vec<TransactionId> = vec![parsed.xid];
|
|
|
|
self.warn_on_ingest_lag(modification.tline.conf, parsed.xact_time);
|
|
|
|
for subxact in &parsed.subxacts {
|
|
let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
|
|
if subxact_pageno != pageno {
|
|
// This subxact goes to different page. Write the record
|
|
// for all the XIDs on the previous page, and continue
|
|
// accumulating XIDs on this new page.
|
|
modification.put_slru_wal_record(
|
|
SlruKind::Clog,
|
|
segno,
|
|
rpageno,
|
|
if is_commit {
|
|
NeonWalRecord::ClogSetCommitted {
|
|
xids: page_xids,
|
|
timestamp: parsed.xact_time,
|
|
}
|
|
} else {
|
|
NeonWalRecord::ClogSetAborted { xids: page_xids }
|
|
},
|
|
)?;
|
|
page_xids = Vec::new();
|
|
}
|
|
pageno = subxact_pageno;
|
|
segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
page_xids.push(*subxact);
|
|
}
|
|
modification.put_slru_wal_record(
|
|
SlruKind::Clog,
|
|
segno,
|
|
rpageno,
|
|
if is_commit {
|
|
NeonWalRecord::ClogSetCommitted {
|
|
xids: page_xids,
|
|
timestamp: parsed.xact_time,
|
|
}
|
|
} else {
|
|
NeonWalRecord::ClogSetAborted { xids: page_xids }
|
|
},
|
|
)?;
|
|
|
|
// Group relations to drop by dbNode. This map will contain all relations that _might_
|
|
// exist, we will reduce it to which ones really exist later. This map can be huge if
|
|
// the transaction touches a huge number of relations (there is no bound on this in
|
|
// postgres).
|
|
let mut drop_relations: HashMap<(u32, u32), Vec<RelTag>> = HashMap::new();
|
|
|
|
for xnode in &parsed.xnodes {
|
|
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
|
|
let rel = RelTag {
|
|
forknum,
|
|
spcnode: xnode.spcnode,
|
|
dbnode: xnode.dbnode,
|
|
relnode: xnode.relnode,
|
|
};
|
|
drop_relations
|
|
.entry((xnode.spcnode, xnode.dbnode))
|
|
.or_default()
|
|
.push(rel);
|
|
}
|
|
}
|
|
|
|
// Execute relation drops in a batch: the number may be huge, so deleting individually is prohibitively expensive
|
|
modification.put_rel_drops(drop_relations, ctx).await?;
|
|
|
|
if origin_id != 0 {
|
|
modification
|
|
.set_replorigin(origin_id, parsed.origin_lsn)
|
|
.await?;
|
|
}
|
|
|
|
if is_prepared {
|
|
// Remove twophase file. see RemoveTwoPhaseFile() in postgres code
|
|
trace!(
|
|
"Drop twophaseFile for xid {} parsed_xact.xid {} here at {}",
|
|
xl_xid, parsed.xid, lsn,
|
|
);
|
|
|
|
let xid: u64 = if modification.tline.pg_version >= PgMajorVersion::PG17 {
|
|
self.adjust_to_full_transaction_id(parsed.xid)?
|
|
} else {
|
|
parsed.xid as u64
|
|
};
|
|
modification.drop_twophase_file(xid, ctx).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_clog_truncate(
|
|
&mut self,
|
|
truncate: ClogTruncate,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let ClogTruncate {
|
|
pageno,
|
|
oldest_xid,
|
|
oldest_xid_db,
|
|
} = truncate;
|
|
|
|
info!(
|
|
"RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}",
|
|
pageno, oldest_xid, oldest_xid_db
|
|
);
|
|
|
|
// In Postgres, oldestXid and oldestXidDB are updated in memory when the CLOG is
|
|
// truncated, but a checkpoint record with the updated values isn't written until
|
|
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
|
|
// so we keep the oldestXid and oldestXidDB up-to-date.
|
|
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
|
cp.oldestXid = oldest_xid;
|
|
cp.oldestXidDB = oldest_xid_db;
|
|
});
|
|
self.checkpoint_modified = true;
|
|
|
|
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
|
|
|
|
let latest_page_number =
|
|
enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
|
|
/ pg_constants::CLOG_XACTS_PER_PAGE;
|
|
|
|
// Now delete all segments containing pages between xlrec.pageno
|
|
// and latest_page_number.
|
|
|
|
// First, make an important safety check:
|
|
// the current endpoint page must not be eligible for removal.
|
|
// See SimpleLruTruncate() in slru.c
|
|
if dispatch_pgversion!(modification.tline.pg_version, {
|
|
pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, pageno)
|
|
}) {
|
|
info!("could not truncate directory pg_xact apparent wraparound");
|
|
return Ok(());
|
|
}
|
|
|
|
// Iterate via SLRU CLOG segments and drop segments that we're ready to truncate
|
|
//
|
|
// We cannot pass 'lsn' to the Timeline.list_nonrels(), or it
|
|
// will block waiting for the last valid LSN to advance up to
|
|
// it. So we use the previous record's LSN in the get calls
|
|
// instead.
|
|
if modification.tline.get_shard_identity().is_shard_zero() {
|
|
for segno in modification
|
|
.tline
|
|
.list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
|
|
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
|
|
});
|
|
|
|
if may_delete {
|
|
modification
|
|
.drop_slru_segment(SlruKind::Clog, segno, ctx)
|
|
.await?;
|
|
trace!("Drop CLOG segment {:>04X}", segno);
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_clog_zero_page(
|
|
&mut self,
|
|
zero_page: ClogZeroPage,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let ClogZeroPage { segno, rpageno } = zero_page;
|
|
|
|
self.put_slru_page_image(
|
|
modification,
|
|
SlruKind::Clog,
|
|
segno,
|
|
rpageno,
|
|
ZERO_PAGE.clone(),
|
|
ctx,
|
|
)
|
|
.await
|
|
}
|
|
|
|
fn ingest_multixact_create(
|
|
&mut self,
|
|
modification: &mut DatadirModification,
|
|
xlrec: &XlMultiXactCreate,
|
|
) -> Result<(), WalIngestError> {
|
|
// Create WAL record for updating the multixact-offsets page
|
|
let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
|
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
|
|
|
modification.put_slru_wal_record(
|
|
SlruKind::MultiXactOffsets,
|
|
segno,
|
|
rpageno,
|
|
NeonWalRecord::MultixactOffsetCreate {
|
|
mid: xlrec.mid,
|
|
moff: xlrec.moff,
|
|
},
|
|
)?;
|
|
|
|
// Create WAL records for the update of each affected multixact-members page
|
|
let mut members = xlrec.members.iter();
|
|
let mut offset = xlrec.moff;
|
|
loop {
|
|
let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
|
|
|
// How many members fit on this page?
|
|
let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32
|
|
- offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
|
|
|
let mut this_page_members: Vec<MultiXactMember> = Vec::new();
|
|
for _ in 0..page_remain {
|
|
if let Some(m) = members.next() {
|
|
this_page_members.push(m.clone());
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
if this_page_members.is_empty() {
|
|
// all done
|
|
break;
|
|
}
|
|
let n_this_page = this_page_members.len();
|
|
|
|
modification.put_slru_wal_record(
|
|
SlruKind::MultiXactMembers,
|
|
pageno / pg_constants::SLRU_PAGES_PER_SEGMENT,
|
|
pageno % pg_constants::SLRU_PAGES_PER_SEGMENT,
|
|
NeonWalRecord::MultixactMembersCreate {
|
|
moff: offset,
|
|
members: this_page_members,
|
|
},
|
|
)?;
|
|
|
|
// Note: The multixact members can wrap around, even within one WAL record.
|
|
offset = offset.wrapping_add(n_this_page as u32);
|
|
}
|
|
let next_offset = offset;
|
|
assert!(xlrec.moff.wrapping_add(xlrec.nmembers) == next_offset);
|
|
|
|
// Update next-multi-xid and next-offset
|
|
//
|
|
// NB: In PostgreSQL, the next-multi-xid stored in the control file is allowed to
|
|
// go to 0, and it's fixed up by skipping to FirstMultiXactId in functions that
|
|
// read it, like GetNewMultiXactId(). This is different from how nextXid is
|
|
// incremented! nextXid skips over < FirstNormalTransactionId when the value
|
|
// is stored, so it's never 0 in a checkpoint.
|
|
//
|
|
// I don't know why it's done that way, it seems less error-prone to skip over 0
|
|
// when the value is stored rather than when it's read. But let's do it the same
|
|
// way here.
|
|
let next_multi_xid = xlrec.mid.wrapping_add(1);
|
|
|
|
if self
|
|
.checkpoint
|
|
.update_next_multixid(next_multi_xid, next_offset)
|
|
{
|
|
self.checkpoint_modified = true;
|
|
}
|
|
|
|
// Also update the next-xid with the highest member. According to the comments in
|
|
// multixact_redo(), this shouldn't be necessary, but let's do the same here.
|
|
let max_mbr_xid = xlrec.members.iter().fold(None, |acc, mbr| {
|
|
if let Some(max_xid) = acc {
|
|
if mbr.xid.wrapping_sub(max_xid) as i32 > 0 {
|
|
Some(mbr.xid)
|
|
} else {
|
|
acc
|
|
}
|
|
} else {
|
|
Some(mbr.xid)
|
|
}
|
|
});
|
|
|
|
if let Some(max_xid) = max_mbr_xid {
|
|
if self.checkpoint.update_next_xid(max_xid) {
|
|
self.checkpoint_modified = true;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_multixact_truncate(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
xlrec: &XlMultiXactTruncate,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let (maxsegment, startsegment, endsegment) =
|
|
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
|
cp.oldestMulti = xlrec.end_trunc_off;
|
|
cp.oldestMultiDB = xlrec.oldest_multi_db;
|
|
let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
|
|
pg_constants::MAX_MULTIXACT_OFFSET,
|
|
);
|
|
let startsegment: i32 =
|
|
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
|
|
let endsegment: i32 =
|
|
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
|
|
(maxsegment, startsegment, endsegment)
|
|
});
|
|
|
|
self.checkpoint_modified = true;
|
|
|
|
// PerformMembersTruncation
|
|
let mut segment: i32 = startsegment;
|
|
|
|
// Delete all the segments except the last one. The last segment can still
|
|
// contain, possibly partially, valid data.
|
|
if modification.tline.get_shard_identity().is_shard_zero() {
|
|
while segment != endsegment {
|
|
modification
|
|
.drop_slru_segment(SlruKind::MultiXactMembers, segment as u32, ctx)
|
|
.await?;
|
|
|
|
/* move to next segment, handling wraparound correctly */
|
|
if segment == maxsegment {
|
|
segment = 0;
|
|
} else {
|
|
segment += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Truncate offsets
|
|
// FIXME: this did not handle wraparound correctly
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_multixact_zero_page(
|
|
&mut self,
|
|
zero_page: MultiXactZeroPage,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let MultiXactZeroPage {
|
|
slru_kind,
|
|
segno,
|
|
rpageno,
|
|
} = zero_page;
|
|
self.put_slru_page_image(
|
|
modification,
|
|
slru_kind,
|
|
segno,
|
|
rpageno,
|
|
ZERO_PAGE.clone(),
|
|
ctx,
|
|
)
|
|
.await
|
|
}
|
|
|
|
async fn ingest_relmap_update(
|
|
&mut self,
|
|
update: RelmapUpdate,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let RelmapUpdate { update, buf } = update;
|
|
|
|
modification
|
|
.put_relmap_file(update.tsid, update.dbid, buf, ctx)
|
|
.await
|
|
}
|
|
|
|
async fn ingest_raw_xlog_record(
|
|
&mut self,
|
|
raw_record: RawXlogRecord,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let RawXlogRecord { info, lsn, mut buf } = raw_record;
|
|
let pg_version = modification.tline.pg_version;
|
|
|
|
if info == pg_constants::XLOG_PARAMETER_CHANGE {
|
|
if let CheckPoint::V17(cp) = &mut self.checkpoint {
|
|
let rec = v17::XlParameterChange::decode(&mut buf);
|
|
cp.wal_level = rec.wal_level;
|
|
self.checkpoint_modified = true;
|
|
}
|
|
} else if info == pg_constants::XLOG_END_OF_RECOVERY {
|
|
if let CheckPoint::V17(cp) = &mut self.checkpoint {
|
|
let rec = v17::XlEndOfRecovery::decode(&mut buf);
|
|
cp.wal_level = rec.wal_level;
|
|
self.checkpoint_modified = true;
|
|
}
|
|
}
|
|
|
|
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
|
if info == pg_constants::XLOG_NEXTOID {
|
|
let next_oid = buf.get_u32_le();
|
|
if cp.nextOid != next_oid {
|
|
cp.nextOid = next_oid;
|
|
self.checkpoint_modified = true;
|
|
}
|
|
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
|
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
|
{
|
|
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
|
|
buf.copy_to_slice(&mut checkpoint_bytes);
|
|
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
|
|
trace!(
|
|
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
|
|
xlog_checkpoint.oldestXid, cp.oldestXid
|
|
);
|
|
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
|
|
cp.oldestXid = xlog_checkpoint.oldestXid;
|
|
}
|
|
trace!(
|
|
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
|
xlog_checkpoint.oldestActiveXid, cp.oldestActiveXid
|
|
);
|
|
|
|
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
|
// because at shutdown, all in-progress transactions will implicitly
|
|
// end. Postgres startup code knows that, and allows hot standby to start
|
|
// immediately from a shutdown checkpoint.
|
|
//
|
|
// In Neon, Postgres hot standby startup always behaves as if starting from
|
|
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
|
// instead of overwriting self.checkpoint.oldestActiveXid with
|
|
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
|
// proper value, knowing that there are no in-progress transactions at this
|
|
// point, except for prepared transactions.
|
|
//
|
|
// See also the neon code changes in the InitWalRecovery() function.
|
|
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
|
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
|
{
|
|
let oldest_active_xid = if pg_version >= PgMajorVersion::PG17 {
|
|
let mut oldest_active_full_xid = cp.nextXid.value;
|
|
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
|
if xid < oldest_active_full_xid {
|
|
oldest_active_full_xid = xid;
|
|
}
|
|
}
|
|
oldest_active_full_xid as u32
|
|
} else {
|
|
let mut oldest_active_xid = cp.nextXid.value as u32;
|
|
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
|
let narrow_xid = xid as u32;
|
|
if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
|
oldest_active_xid = narrow_xid;
|
|
}
|
|
}
|
|
oldest_active_xid
|
|
};
|
|
cp.oldestActiveXid = oldest_active_xid;
|
|
} else {
|
|
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
|
}
|
|
// NB: We abuse the Checkpoint.redo field:
|
|
//
|
|
// - In PostgreSQL, the Checkpoint struct doesn't store the information
|
|
// of whether this is an online checkpoint or a shutdown checkpoint. It's
|
|
// stored in the XLOG info field of the WAL record, shutdown checkpoints
|
|
// use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
|
|
// XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
|
|
// in the pageserver, however.
|
|
//
|
|
// - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
|
|
// checkpoint record, if it's a shutdown checkpoint. But when we are
|
|
// starting from a shutdown checkpoint, the basebackup LSN is the *end*
|
|
// of the shutdown checkpoint WAL record. That makes it difficult to
|
|
// correctly detect whether we're starting from a shutdown record or
|
|
// not.
|
|
//
|
|
// To address both of those issues, we store 0 in the redo field if it's
|
|
// an online checkpoint record, and the record's *end* LSN if it's a
|
|
// shutdown checkpoint. We don't need the original redo pointer in neon,
|
|
// because we don't perform WAL replay at startup anyway, so we can get
|
|
// away with abusing the redo field like this.
|
|
//
|
|
// XXX: Ideally, we would persist the extra information in a more
|
|
// explicit format, rather than repurpose the fields of the Postgres
|
|
// struct like this. However, we already have persisted data like this,
|
|
// so we need to maintain backwards compatibility.
|
|
//
|
|
// NB: We didn't originally have this convention, so there are still old
|
|
// persisted records that didn't do this. Before, we didn't update the
|
|
// persisted redo field at all. That means that old records have a bogus
|
|
// redo pointer that points to some old value, from the checkpoint record
|
|
// that was originally imported from the data directory. If it was a
|
|
// project created in Neon, that means it points to the first checkpoint
|
|
// after initdb. That's OK for our purposes: all such old checkpoints are
|
|
// treated as old online checkpoints when the basebackup is created.
|
|
cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
|
|
// Store the *end* LSN of the checkpoint record. Or to be precise,
|
|
// the start LSN of the *next* record, i.e. if the record ends
|
|
// exactly at page boundary, the redo LSN points to just after the
|
|
// page header on the next page.
|
|
lsn.into()
|
|
} else {
|
|
Lsn::INVALID.into()
|
|
};
|
|
|
|
// Write a new checkpoint key-value pair on every checkpoint record, even
|
|
// if nothing really changed. Not strictly required, but it seems nice to
|
|
// have some trace of the checkpoint records in the layer files at the same
|
|
// LSNs.
|
|
self.checkpoint_modified = true;
|
|
}
|
|
});
|
|
|
|
if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
|
|
modification.tline.prepare_basebackup(lsn);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_logical_message_put(
|
|
&mut self,
|
|
put: PutLogicalMessage,
|
|
modification: &mut DatadirModification<'_>,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let PutLogicalMessage { path, buf } = put;
|
|
modification.put_file(path.as_str(), &buf, ctx).await
|
|
}
|
|
|
|
fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<(), WalIngestError> {
|
|
match record {
|
|
StandbyRecord::RunningXacts(running_xacts) => {
|
|
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
|
cp.oldestActiveXid = running_xacts.oldest_running_xid;
|
|
});
|
|
|
|
self.checkpoint_modified = true;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn ingest_replorigin_record(
|
|
&mut self,
|
|
record: ReploriginRecord,
|
|
modification: &mut DatadirModification<'_>,
|
|
) -> Result<(), WalIngestError> {
|
|
match record {
|
|
ReploriginRecord::Set(set) => {
|
|
modification
|
|
.set_replorigin(set.node_id, set.remote_lsn)
|
|
.await?;
|
|
}
|
|
ReploriginRecord::Drop(drop) => {
|
|
modification.drop_replorigin(drop.node_id).await?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_rel_creation(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
modification.put_rel_creation(rel, 0, ctx).await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
async fn put_rel_page_image(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
img: Bytes,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
self.handle_rel_extend(modification, rel, blknum, ctx)
|
|
.await?;
|
|
modification.put_rel_page_image(rel, blknum, img)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_rel_wal_record(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
rec: NeonWalRecord,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
self.handle_rel_extend(modification, rel, blknum, ctx)
|
|
.await?;
|
|
modification.put_rel_wal_record(rel, blknum, rec)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_rel_truncation(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
nblocks: BlockNumber,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
modification.put_rel_truncation(rel, nblocks, ctx).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_rel_extend(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
let new_nblocks = blknum + 1;
|
|
// Check if the relation exists. We implicitly create relations on first
|
|
// record.
|
|
let old_nblocks = modification.create_relation_if_required(rel, ctx).await?;
|
|
|
|
if new_nblocks > old_nblocks {
|
|
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
|
|
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
|
|
|
|
let mut key = rel_block_to_key(rel, blknum);
|
|
|
|
// fill the gap with zeros
|
|
let mut gap_blocks_filled: u64 = 0;
|
|
for gap_blknum in old_nblocks..blknum {
|
|
key.field6 = gap_blknum;
|
|
|
|
if self.shard.get_shard_number(&key) != self.shard.number {
|
|
continue;
|
|
}
|
|
|
|
modification.put_rel_page_image_zero(rel, gap_blknum)?;
|
|
gap_blocks_filled += 1;
|
|
}
|
|
|
|
WAL_INGEST
|
|
.gap_blocks_zeroed_on_rel_extend
|
|
.inc_by(gap_blocks_filled);
|
|
|
|
// Log something when relation extends cause use to fill gaps
|
|
// with zero pages. Logging is rate limited per pg version to
|
|
// avoid skewing.
|
|
if gap_blocks_filled > 0 {
|
|
use std::sync::Mutex;
|
|
|
|
use once_cell::sync::Lazy;
|
|
use utils::rate_limit::RateLimit;
|
|
|
|
struct RateLimitPerPgVersion {
|
|
rate_limiters: [Lazy<Mutex<RateLimit>>; 4],
|
|
}
|
|
|
|
impl RateLimitPerPgVersion {
|
|
const fn new() -> Self {
|
|
Self {
|
|
rate_limiters: [const {
|
|
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(30))))
|
|
}; 4],
|
|
}
|
|
}
|
|
|
|
const fn rate_limiter(
|
|
&self,
|
|
pg_version: PgMajorVersion,
|
|
) -> Option<&Lazy<Mutex<RateLimit>>> {
|
|
const MIN_PG_VERSION: u32 = PgMajorVersion::PG14.major_version_num();
|
|
const MAX_PG_VERSION: u32 = PgMajorVersion::PG17.major_version_num();
|
|
let pg_version = pg_version.major_version_num();
|
|
|
|
if pg_version < MIN_PG_VERSION || pg_version > MAX_PG_VERSION {
|
|
return None;
|
|
}
|
|
|
|
Some(&self.rate_limiters[(pg_version - MIN_PG_VERSION) as usize])
|
|
}
|
|
}
|
|
|
|
static LOGGED: RateLimitPerPgVersion = RateLimitPerPgVersion::new();
|
|
if let Some(rate_limiter) = LOGGED.rate_limiter(modification.tline.pg_version) {
|
|
if let Ok(mut locked) = rate_limiter.try_lock() {
|
|
locked.call(|| {
|
|
info!(
|
|
lsn=%modification.get_lsn(),
|
|
pg_version=%modification.tline.pg_version,
|
|
rel=%rel,
|
|
"Filled {} gap blocks on rel extend to {} from {}",
|
|
gap_blocks_filled,
|
|
new_nblocks,
|
|
old_nblocks);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn put_slru_page_image(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
blknum: BlockNumber,
|
|
img: Bytes,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
if !self.shard.is_shard_zero() {
|
|
return Ok(());
|
|
}
|
|
|
|
self.handle_slru_extend(modification, kind, segno, blknum, ctx)
|
|
.await?;
|
|
modification.put_slru_page_image(kind, segno, blknum, img)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn handle_slru_extend(
|
|
&mut self,
|
|
modification: &mut DatadirModification<'_>,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
blknum: BlockNumber,
|
|
ctx: &RequestContext,
|
|
) -> Result<(), WalIngestError> {
|
|
// we don't use a cache for this like we do for relations. SLRUS are explcitly
|
|
// extended with ZEROPAGE records, not with commit records, so it happens
|
|
// a lot less frequently.
|
|
|
|
let new_nblocks = blknum + 1;
|
|
// Check if the relation exists. We implicitly create relations on first
|
|
// record.
|
|
// TODO: would be nice if to be more explicit about it
|
|
let old_nblocks = if !modification
|
|
.tline
|
|
.get_slru_segment_exists(kind, segno, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
// create it with 0 size initially, the logic below will extend it
|
|
modification
|
|
.put_slru_segment_creation(kind, segno, 0, ctx)
|
|
.await?;
|
|
0
|
|
} else {
|
|
modification
|
|
.tline
|
|
.get_slru_segment_size(kind, segno, Version::Modified(modification), ctx)
|
|
.await?
|
|
};
|
|
|
|
if new_nblocks > old_nblocks {
|
|
trace!(
|
|
"extending SLRU {:?} seg {} from {} to {} blocks",
|
|
kind, segno, old_nblocks, new_nblocks
|
|
);
|
|
modification.put_slru_extend(kind, segno, new_nblocks)?;
|
|
|
|
// fill the gap with zeros
|
|
for gap_blknum in old_nblocks..blknum {
|
|
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
|
|
///
|
|
/// This is only accurate on shard 0. On other shards, it will return the size up to the highest
|
|
/// page number stored in the shard, or None if the shard does not have any pages for it.
|
|
async fn get_relsize(
|
|
modification: &DatadirModification<'_>,
|
|
rel: RelTag,
|
|
ctx: &RequestContext,
|
|
) -> Result<Option<BlockNumber>, PageReconstructError> {
|
|
if !modification
|
|
.tline
|
|
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
|
.await?
|
|
{
|
|
return Ok(None);
|
|
}
|
|
modification
|
|
.tline
|
|
.get_rel_size(rel, Version::Modified(modification), ctx)
|
|
.await
|
|
.map(Some)
|
|
}
|
|
|
|
#[allow(clippy::bool_assert_comparison)]
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use anyhow::Result;
|
|
use postgres_ffi::PgMajorVersion;
|
|
use postgres_ffi::RELSEG_SIZE;
|
|
|
|
use super::*;
|
|
use crate::DEFAULT_PG_VERSION;
|
|
use crate::tenant::harness::*;
|
|
use crate::tenant::remote_timeline_client::{INITDB_PATH, remote_initdb_archive_path};
|
|
use crate::tenant::storage_layer::IoConcurrency;
|
|
|
|
/// Arbitrary relation tag, for testing.
|
|
const TESTREL_A: RelTag = RelTag {
|
|
spcnode: 0,
|
|
dbnode: 111,
|
|
relnode: 1000,
|
|
forknum: 0,
|
|
};
|
|
|
|
fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
|
|
// TODO
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<(), anyhow::Error> {
|
|
for i in PgMajorVersion::ALL {
|
|
dispatch_pgversion!(i, {
|
|
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
|
|
});
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
|
|
let mut m = tline.begin_modification(Lsn(0x10));
|
|
m.put_checkpoint(dispatch_pgversion!(
|
|
tline.pg_version,
|
|
pgv::ZERO_CHECKPOINT.clone()
|
|
))?;
|
|
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
|
m.commit(ctx).await?;
|
|
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
|
|
|
Ok(walingest)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_relsize() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
|
|
let io_concurrency = IoConcurrency::spawn_for_test();
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
let mut m = tline.begin_modification(Lsn(0x20));
|
|
walingest.put_rel_creation(&mut m, TESTREL_A, &ctx).await?;
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
let mut m = tline.begin_modification(Lsn(0x30));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 3"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
let mut m = tline.begin_modification(Lsn(0x40));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1 at 4"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
let mut m = tline.begin_modification(Lsn(0x50));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 2, test_img("foo blk 2 at 5"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
assert_current_logical_size(&tline, Lsn(0x50));
|
|
|
|
let test_span = tracing::info_span!(parent: None, "test",
|
|
tenant_id=%tline.tenant_shard_id.tenant_id,
|
|
shard_id=%tline.tenant_shard_id.shard_slug(),
|
|
timeline_id=%tline.timeline_id);
|
|
|
|
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
|
.await?,
|
|
false
|
|
);
|
|
assert!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
|
.await
|
|
.is_err()
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
3
|
|
);
|
|
|
|
// Check page contents at each LSN
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
0,
|
|
Version::at(Lsn(0x20)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 0 at 2")
|
|
);
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
0,
|
|
Version::at(Lsn(0x30)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
0,
|
|
Version::at(Lsn(0x40)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
1,
|
|
Version::at(Lsn(0x40)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 1 at 4")
|
|
);
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
0,
|
|
Version::at(Lsn(0x50)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
1,
|
|
Version::at(Lsn(0x50)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 1 at 4")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
2,
|
|
Version::at(Lsn(0x50)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 2 at 5")
|
|
);
|
|
|
|
// Truncate last block
|
|
let mut m = tline.begin_modification(Lsn(0x60));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_current_logical_size(&tline, Lsn(0x60));
|
|
|
|
// Check reported size and contents after truncation
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
|
|
.await?,
|
|
2
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
0,
|
|
Version::at(Lsn(0x60)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 0 at 3")
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
1,
|
|
Version::at(Lsn(0x60)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 1 at 4")
|
|
);
|
|
|
|
// should still see the truncated block with older LSN
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
3
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
2,
|
|
Version::at(Lsn(0x50)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 2 at 5")
|
|
);
|
|
|
|
// Truncate to zero length
|
|
let mut m = tline.begin_modification(Lsn(0x68));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
|
|
.await?,
|
|
0
|
|
);
|
|
|
|
// Extend from 0 to 2 blocks, leaving a gap
|
|
let mut m = tline.begin_modification(Lsn(0x70));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 1, test_img("foo blk 1"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
|
|
.await?,
|
|
2
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
0,
|
|
Version::at(Lsn(0x70)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
ZERO_PAGE
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
1,
|
|
Version::at(Lsn(0x70)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 1")
|
|
);
|
|
|
|
// Extend a lot more, leaving a big gap that spans across segments
|
|
let mut m = tline.begin_modification(Lsn(0x80));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 1500, test_img("foo blk 1500"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
1501
|
|
);
|
|
for blk in 2..1500 {
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
blk,
|
|
Version::at(Lsn(0x80)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
ZERO_PAGE
|
|
);
|
|
}
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
1500,
|
|
Version::at(Lsn(0x80)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img("foo blk 1500")
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Test what happens if we dropped a relation
|
|
// and then created it again within the same layer.
|
|
#[tokio::test]
|
|
async fn test_drop_extend() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_drop_extend")
|
|
.await?
|
|
.load()
|
|
.await;
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
let mut m = tline.begin_modification(Lsn(0x20));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 2"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check that rel exists and size is correct
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
|
|
// Drop rel
|
|
let mut m = tline.begin_modification(Lsn(0x30));
|
|
let mut rel_drops = HashMap::new();
|
|
rel_drops.insert((TESTREL_A.spcnode, TESTREL_A.dbnode), vec![TESTREL_A]);
|
|
m.put_rel_drops(rel_drops, &ctx).await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check that rel is not visible anymore
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
|
|
.await?,
|
|
false
|
|
);
|
|
|
|
// FIXME: should fail
|
|
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
|
|
|
|
// Re-create it
|
|
let mut m = tline.begin_modification(Lsn(0x40));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, 0, test_img("foo blk 0 at 4"), &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check that rel exists and size is correct
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Test what happens if we truncated a relation
|
|
// so that one of its segments was dropped
|
|
// and then extended it again within the same layer.
|
|
#[tokio::test]
|
|
async fn test_truncate_extend() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
|
|
.await?
|
|
.load()
|
|
.await;
|
|
let io_concurrency = IoConcurrency::spawn_for_test();
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
// Create a 20 MB relation (the size is arbitrary)
|
|
let relsize = 20 * 1024 * 1024 / 8192;
|
|
let mut m = tline.begin_modification(Lsn(0x20));
|
|
for blkno in 0..relsize {
|
|
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
|
|
.await?;
|
|
}
|
|
m.commit(&ctx).await?;
|
|
|
|
let test_span = tracing::info_span!(parent: None, "test",
|
|
tenant_id=%tline.tenant_shard_id.tenant_id,
|
|
shard_id=%tline.tenant_shard_id.shard_slug(),
|
|
timeline_id=%tline.timeline_id);
|
|
|
|
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
|
.await?,
|
|
false
|
|
);
|
|
assert!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
|
.await
|
|
.is_err()
|
|
);
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
|
.await?,
|
|
relsize
|
|
);
|
|
|
|
// Check relation content
|
|
for blkno in 0..relsize {
|
|
let lsn = Lsn(0x20);
|
|
let data = format!("foo blk {blkno} at {lsn}");
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
blkno,
|
|
Version::at(lsn),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
// Truncate relation so that second segment was dropped
|
|
// - only leave one page
|
|
let mut m = tline.begin_modification(Lsn(0x60));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
|
|
// Check reported size and contents after truncation
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
|
|
.await?,
|
|
1
|
|
);
|
|
|
|
for blkno in 0..1 {
|
|
let lsn = Lsn(0x20);
|
|
let data = format!("foo blk {blkno} at {lsn}");
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
blkno,
|
|
Version::at(Lsn(0x60)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
// should still see all blocks with older LSN
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
|
.await?,
|
|
relsize
|
|
);
|
|
for blkno in 0..relsize {
|
|
let lsn = Lsn(0x20);
|
|
let data = format!("foo blk {blkno} at {lsn}");
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
blkno,
|
|
Version::at(Lsn(0x50)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
// Extend relation again.
|
|
// Add enough blocks to create second segment
|
|
let lsn = Lsn(0x80);
|
|
let mut m = tline.begin_modification(lsn);
|
|
for blkno in 0..relsize {
|
|
let data = format!("foo blk {blkno} at {lsn}");
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, blkno, test_img(&data), &ctx)
|
|
.await?;
|
|
}
|
|
m.commit(&ctx).await?;
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
true
|
|
);
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
|
.await?,
|
|
relsize
|
|
);
|
|
// Check relation content
|
|
for blkno in 0..relsize {
|
|
let lsn = Lsn(0x80);
|
|
let data = format!("foo blk {blkno} at {lsn}");
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_page_at_lsn(
|
|
TESTREL_A,
|
|
blkno,
|
|
Version::at(Lsn(0x80)),
|
|
&ctx,
|
|
io_concurrency.clone()
|
|
)
|
|
.instrument(test_span.clone())
|
|
.await?,
|
|
test_img(&data)
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Test get_relsize() and truncation with a file larger than 1 GB, so that it's
|
|
/// split into multiple 1 GB segments in Postgres.
|
|
#[tokio::test]
|
|
async fn test_large_rel() -> Result<()> {
|
|
let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
|
|
let tline = tenant
|
|
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
|
.await?;
|
|
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
|
|
|
let mut lsn = 0x10;
|
|
for blknum in 0..RELSEG_SIZE + 1 {
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
let img = test_img(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
|
|
walingest
|
|
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
}
|
|
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
RELSEG_SIZE + 1
|
|
);
|
|
|
|
// Truncate one block
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
RELSEG_SIZE
|
|
);
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
// Truncate another block
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
RELSEG_SIZE - 1
|
|
);
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
|
|
// This tests the behavior at segment boundaries
|
|
let mut size: i32 = 3000;
|
|
while size >= 0 {
|
|
lsn += 0x10;
|
|
let mut m = tline.begin_modification(Lsn(lsn));
|
|
walingest
|
|
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
|
|
.await?;
|
|
m.commit(&ctx).await?;
|
|
assert_eq!(
|
|
tline
|
|
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
|
.await?,
|
|
size as BlockNumber
|
|
);
|
|
|
|
size -= 1;
|
|
}
|
|
assert_current_logical_size(&tline, Lsn(lsn));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Replay a wal segment file taken directly from safekeepers.
|
|
///
|
|
/// This test is useful for benchmarking since it allows us to profile only
|
|
/// the walingest code in a single-threaded executor, and iterate more quickly
|
|
/// without waiting for unrelated steps.
|
|
#[tokio::test]
|
|
async fn test_ingest_real_wal() {
|
|
use postgres_ffi::WAL_SEGMENT_SIZE;
|
|
use postgres_ffi::waldecoder::WalStreamDecoder;
|
|
|
|
use crate::tenant::harness::*;
|
|
|
|
// Define test data path and constants.
|
|
//
|
|
// Steps to reconstruct the data, if needed:
|
|
// 1. Run the pgbench python test
|
|
// 2. Take the first wal segment file from safekeeper
|
|
// 3. Compress it using `zstd --long input_file`
|
|
// 4. Copy initdb.tar.zst from local_fs_remote_storage
|
|
// 5. Grep sk logs for "restart decoder" to get startpoint
|
|
// 6. Run just the decoder from this test to get the endpoint.
|
|
// It's the last LSN the decoder will output.
|
|
let pg_version = PgMajorVersion::PG15; // The test data was generated by pg15
|
|
let path = "test_data/sk_wal_segment_from_pgbench";
|
|
let wal_segment_path = format!("{path}/000000010000000000000001.zst");
|
|
let source_initdb_path = format!("{path}/{INITDB_PATH}");
|
|
let startpoint = Lsn::from_hex("14AEC08").unwrap();
|
|
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
|
|
|
|
let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
|
|
let span = harness
|
|
.span()
|
|
.in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));
|
|
let (tenant, ctx) = harness.load().await;
|
|
|
|
let remote_initdb_path =
|
|
remote_initdb_archive_path(&tenant.tenant_shard_id().tenant_id, &TIMELINE_ID);
|
|
let initdb_path = harness.remote_fs_dir.join(remote_initdb_path.get_path());
|
|
|
|
std::fs::create_dir_all(initdb_path.parent().unwrap())
|
|
.expect("creating test dir should work");
|
|
std::fs::copy(source_initdb_path, initdb_path).expect("copying the initdb.tar.zst works");
|
|
|
|
// Bootstrap a real timeline. We can't use create_test_timeline because
|
|
// it doesn't create a real checkpoint, and Walingest::new tries to parse
|
|
// the garbage data.
|
|
let tline = tenant
|
|
.bootstrap_timeline_test(TIMELINE_ID, pg_version, Some(TIMELINE_ID), &ctx)
|
|
.await
|
|
.unwrap();
|
|
|
|
// We fully read and decompress this into memory before decoding
|
|
// to get a more accurate perf profile of the decoder.
|
|
let bytes = {
|
|
use async_compression::tokio::bufread::ZstdDecoder;
|
|
let file = tokio::fs::File::open(wal_segment_path).await.unwrap();
|
|
let reader = tokio::io::BufReader::new(file);
|
|
let decoder = ZstdDecoder::new(reader);
|
|
let mut reader = tokio::io::BufReader::new(decoder);
|
|
let mut buffer = Vec::new();
|
|
tokio::io::copy_buf(&mut reader, &mut buffer).await.unwrap();
|
|
buffer
|
|
};
|
|
|
|
// TODO start a profiler too
|
|
let started_at = std::time::Instant::now();
|
|
|
|
// Initialize walingest
|
|
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
|
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
|
|
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx)
|
|
.await
|
|
.unwrap();
|
|
let mut modification = tline.begin_modification(startpoint);
|
|
println!("decoding {} bytes", bytes.len() - xlogoff);
|
|
|
|
// Decode and ingest wal. We process the wal in chunks because
|
|
// that's what happens when we get bytes from safekeepers.
|
|
for chunk in bytes[xlogoff..].chunks(50) {
|
|
decoder.feed_bytes(chunk);
|
|
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
|
|
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
|
recdata,
|
|
&[*modification.tline.get_shard_identity()],
|
|
lsn,
|
|
modification.tline.pg_version,
|
|
)
|
|
.unwrap()
|
|
.remove(modification.tline.get_shard_identity())
|
|
.unwrap();
|
|
|
|
walingest
|
|
.ingest_record(interpreted, &mut modification, &ctx)
|
|
.instrument(span.clone())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
modification.commit(&ctx).await.unwrap();
|
|
}
|
|
|
|
let duration = started_at.elapsed();
|
|
println!("done in {duration:?}");
|
|
}
|
|
}
|