mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
applied changes from neon #12126
This commit is contained in:
@@ -2914,7 +2914,6 @@ pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) records_received: IntCounter,
|
||||
pub(crate) records_observed: IntCounter,
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
pub(crate) values_committed_metadata_images: IntCounter,
|
||||
pub(crate) values_committed_metadata_deltas: IntCounter,
|
||||
pub(crate) values_committed_data_images: IntCounter,
|
||||
@@ -2970,11 +2969,6 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
"Number of WAL records which resulted in writes to pageserver storage"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
records_filtered: register_int_counter!(
|
||||
"pageserver_wal_ingest_records_filtered",
|
||||
"Number of WAL records filtered out due to sharding"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
|
||||
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
|
||||
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
|
||||
|
||||
@@ -16,7 +16,6 @@ use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use postgres_ffi::v14::xlog_utils::normalize_lsn;
|
||||
use postgres_ffi::waldecoder::WalDecodeError;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use tokio::sync::watch;
|
||||
@@ -32,7 +31,7 @@ use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::sync::gate::GateError;
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords};
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecords};
|
||||
use wal_decoder::wire_format::FromWireFormat;
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
@@ -276,8 +275,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let copy_stream = replication_client.copy_both_simple(&query).await?;
|
||||
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.map_err(|e| match e.kind {
|
||||
@@ -285,8 +282,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
|
||||
let shard = vec![*timeline.get_shard_identity()];
|
||||
|
||||
let (format, compression) = match protocol {
|
||||
PostgresClientProtocol::Interpreted {
|
||||
format,
|
||||
@@ -517,143 +512,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
let data = xlog_data.data();
|
||||
let startlsn = Lsn::from(xlog_data.wal_start());
|
||||
let endlsn = startlsn + data.len() as u64;
|
||||
|
||||
trace!("received XLogData between {startlsn} and {endlsn}");
|
||||
|
||||
WAL_INGEST.bytes_received.inc_by(data.len() as u64);
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
{
|
||||
let mut modification = timeline.begin_modification(startlsn);
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
if !next_record_lsn.is_aligned() {
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
&shard,
|
||||
next_record_lsn,
|
||||
modification.tline.pg_version,
|
||||
)?
|
||||
.remove(timeline.get_shard_identity())
|
||||
.unwrap();
|
||||
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
|
||||
// these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure
|
||||
// all earlier writes of data blocks are visible by committing any modification in flight.
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
timeline.metrics.wal_records_received.inc();
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {next_record_lsn}")
|
||||
})
|
||||
.inspect_err(|err| {
|
||||
// TODO: we can't differentiate cancellation errors with
|
||||
// anyhow::Error, so just ignore it if we're cancelled.
|
||||
if !cancellation.is_cancelled() {
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
Some(&timeline.corruption_detected),
|
||||
"{err:?}"
|
||||
)
|
||||
}
|
||||
})?;
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
// failpoint library; in tests, the added amount of debugging will cause us
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = next_record_lsn;
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
uncommitted_records += 1;
|
||||
if uncommitted_records >= ingest_batch_size
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the remaining records.
|
||||
if uncommitted_records > 0 {
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
info!("caught up at LSN {endlsn}");
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
Some(endlsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
|
||||
let wal_end = keepalive.wal_end();
|
||||
let timestamp = keepalive.timestamp();
|
||||
|
||||
Reference in New Issue
Block a user