pageserver: ingest interpreted WAL records

This commit is contained in:
Vlad Lazar
2024-11-13 13:42:41 +01:00
parent 4f3583a316
commit 3b4ffaf79a

View File

@@ -36,7 +36,7 @@ use crate::{
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{id::NodeId, lsn::Lsn};
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
/// Status of the connection.
@@ -291,6 +291,15 @@ pub(super) async fn handle_walreceiver_connection(
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
}
ReplicationMessage::RawInterpretedWalRecords(raw) => {
connection_status.latest_connection_update = now;
if !raw.data().is_empty() {
connection_status.latest_wal_update = now;
}
connection_status.commit_lsn = Some(Lsn::from(raw.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
}
&_ => {}
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
@@ -298,7 +307,130 @@ pub(super) async fn handle_walreceiver_connection(
return Ok(());
}
async fn commit(
modification: &mut DatadirModification<'_>,
uncommitted: &mut u64,
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
modification.commit(ctx).await?;
*uncommitted = 0;
*filtered = 0;
Ok(())
}
let status_update = match replication_message {
ReplicationMessage::RawInterpretedWalRecords(raw) => {
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
let mut uncommitted_records = 0;
let mut filtered_records = 0;
// This is the end LSN of the raw WAL from which the records
// were interpreted.
let streaming_lsn = Lsn::from(raw.streaming_lsn());
tracing::debug!(
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
Lsn(raw.next_record_lsn().unwrap_or(0))
);
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
anyhow::anyhow!(
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
)
})?;
// We start the modification at 0 because each interpreted record
// advances it to its end LSN. 0 is just an initialization placeholder.
let mut modification = timeline.begin_modification(Lsn(0));
for interpreted in records {
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0
{
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
let next_record_lsn = interpreted.next_record_lsn;
let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
WAL_INGEST.records_filtered.inc();
filtered_records += 1;
}
uncommitted_records += 1;
// FIXME: this cannot be made pausable_failpoint without fixing the
// failpoint library; in tests, the added amount of debugging will cause us
// to timeout the tests.
fail_point!("walreceiver-after-ingest");
// Commit every ingest_batch_size records. Even if we filtered out
// all records, we still need to call commit to advance the LSN.
if uncommitted_records >= ingest_batch_size
|| modification.approx_pending_bytes()
> DatadirModification::MAX_PENDING_BYTES
{
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
}
// Records might have been filtered out on the safekeeper side, but we still
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true
}
_ => false,
};
if uncommitted_records > 0 || needs_last_record_lsn_advance {
// Commit any uncommitted records
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
if !caught_up && streaming_lsn >= end_of_wal {
info!("caught up at LSN {streaming_lsn}");
caught_up = true;
}
tracing::debug!(
"Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
timeline.get_last_record_lsn()
);
Some(streaming_lsn)
}
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
@@ -316,21 +448,6 @@ pub(super) async fn handle_walreceiver_connection(
let mut uncommitted_records = 0;
let mut filtered_records = 0;
async fn commit(
modification: &mut DatadirModification<'_>,
uncommitted: &mut u64,
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
modification.commit(ctx).await?;
*uncommitted = 0;
*filtered = 0;
Ok(())
}
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are