Refactor WAL ingestion code.

Rename save_decoded_record() to ingest_record(), and move the
responsibility for decoding the record into ingest_record().

Also move the responsibility of updating the CheckPoint relish to
ingest_record(). Put it in a new WalIngest struct, to help with tracking
that.
This commit is contained in:
Heikki Linnakangas
2021-12-14 13:56:07 +02:00
parent f8f88154d5
commit 7f78e80c51
3 changed files with 688 additions and 671 deletions

View File

@@ -7,24 +7,26 @@ use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, ensure, Result};
use bytes::Bytes;
use tracing::*;
use crate::relish::*;
use crate::repository::*;
use crate::walrecord::*;
use crate::walingest;
use crate::walingest::WalIngest;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::Oid;
use postgres_ffi::{pg_constants, CheckPoint, ControlFileData};
use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use zenith_utils::lsn::Lsn;
///
/// Import all relation data pages from local disk into the repository.
///
/// This is currently only used to import a cluster freshly created by initdb.
/// The code that deals with the checkpoint would not work right if the
/// cluster was not shut down cleanly.
pub fn import_timeline_from_postgres_datadir(
path: &Path,
writer: &dyn TimelineWriter,
@@ -120,18 +122,28 @@ pub fn import_timeline_from_postgres_datadir(
}
// TODO: Scan pg_tblspc
// We're done importing all the data files.
writer.advance_last_record_lsn(lsn);
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.ok_or_else(|| anyhow!("pg_control file not found"))?;
ensure!(
pg_control.state == DBState_DB_SHUTDOWNED,
"Postgres cluster was not shut down cleanly"
);
ensure!(
pg_control.checkPointCopy.redo == lsn.0,
"unexpected checkpoint REDO pointer"
);
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
// this reads the checkpoint record itself, advancing the tip of the timeline to
// *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'
let pg_control = pg_control.ok_or_else(|| anyhow!("pg_control file not found"))?;
// *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
import_wal(
&path.join("pg_wal"),
writer,
Lsn(pg_control.checkPointCopy.redo),
lsn,
&mut pg_control.checkPointCopy.clone(),
)?;
Ok(())
@@ -300,10 +312,9 @@ fn import_slru_file(
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal(
walpath: &Path,
timeline: &dyn TimelineWriter,
writer: &dyn TimelineWriter,
startpoint: Lsn,
endpoint: Lsn,
checkpoint: &mut CheckPoint,
) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint);
@@ -311,6 +322,8 @@ fn import_wal(
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let mut walingest = WalIngest::new(writer, startpoint)?;
while last_lsn <= endpoint {
// FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
@@ -342,32 +355,9 @@ fn import_wal(
let mut nrecords = 0;
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut checkpoint_modified = false;
let decoded = decode_wal_record(recdata.clone());
walingest::save_decoded_record(
checkpoint,
&mut checkpoint_modified,
timeline,
&decoded,
recdata,
lsn,
)?;
walingest.ingest_record(writer, recdata, lsn)?;
last_lsn = lsn;
if checkpoint_modified {
let checkpoint_bytes = checkpoint.encode();
timeline.put_page_image(
RelishTag::Checkpoint,
0,
last_lsn,
checkpoint_bytes,
)?;
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
timeline.advance_last_record_lsn(last_lsn);
nrecords += 1;
trace!("imported record at {} (end {})", lsn, endpoint);
@@ -381,12 +371,7 @@ fn import_wal(
}
if last_lsn != startpoint {
debug!(
"reached end of WAL at {}, updating checkpoint info",
last_lsn
);
timeline.advance_last_record_lsn(last_lsn);
debug!("reached end of WAL at {}", last_lsn);
} else {
info!("no WAL to import at {}", last_lsn);
}

File diff suppressed because it is too large Load Diff

View File

@@ -5,12 +5,10 @@
//!
//! We keep one WAL receiver active per timeline.
use crate::relish::*;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::tenant_threads;
use crate::walingest;
use crate::walrecord::*;
use crate::walingest::WalIngest;
use crate::PageServerConf;
use anyhow::{bail, Context, Error, Result};
use lazy_static::lazy_static;
@@ -18,7 +16,6 @@ use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::waldecoder::*;
use postgres_ffi::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::cell::Cell;
@@ -240,9 +237,7 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn(RelishTag::Checkpoint, 0, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
let mut walingest = WalIngest::new(&*timeline, startpoint)?;
while let Some(replication_message) = physical_stream.next()? {
let status_update = match replication_message {
@@ -266,34 +261,8 @@ fn walreceiver_main(
assert!(lsn.is_aligned());
let writer = timeline.writer();
walingest.ingest_record(writer.as_ref(), recdata, lsn)?;
let mut checkpoint_modified = false;
let decoded = decode_wal_record(recdata.clone());
walingest::save_decoded_record(
&mut checkpoint,
&mut checkpoint_modified,
writer.as_ref(),
&decoded,
recdata,
lsn,
)?;
// Check if checkpoint data was updated by save_decoded_record
if checkpoint_modified {
let new_checkpoint_bytes = checkpoint.encode();
writer.put_page_image(
RelishTag::Checkpoint,
0,
lsn,
new_checkpoint_bytes,
)?;
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
writer.advance_last_record_lsn(lsn);
last_rec_lsn = lsn;
}