From da2f4879bf41c4fed4d98665509742d1ce7b5d8d Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Thu, 30 Nov 2023 16:28:24 -0500 Subject: [PATCH] Remove committing mode in ingest_record --- pageserver/src/import_datadir.rs | 6 ++-- pageserver/src/pgdatadir_mapping.rs | 22 ++++++++++---- pageserver/src/tenant/timeline.rs | 3 +- .../walreceiver/walreceiver_connection.rs | 29 +++++++++++------- pageserver/src/walingest.rs | 30 +++++++------------ 5 files changed, 50 insertions(+), 40 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 5bcf607f53..fea32a6662 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -317,8 +317,9 @@ async fn import_wal( while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true) + .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx) .await?; + modification.commit(ctx).await?; last_lsn = lsn; nrecords += 1; @@ -453,8 +454,9 @@ pub async fn import_wal_from_tar( while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true) + .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx) .await?; + modification.commit(ctx).await?; last_lsn = lsn; debug!("imported record at {} (end {})", lsn, end_lsn); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a860aee2b9..88d997fcca 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1389,11 +1389,15 @@ impl<'a> DatadirModification<'a> { let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; - writer.put_batch(&self.pending_updates, ctx).await?; - self.pending_updates.clear(); + if !self.pending_updates.is_empty() { + writer.put_batch(&self.pending_updates, ctx).await?; + self.pending_updates.clear(); + } - writer.delete_batch(&self.pending_deletions).await?; - self.pending_deletions.clear(); + if !self.pending_deletions.is_empty() { + writer.delete_batch(&self.pending_deletions).await?; + self.pending_deletions.clear(); + } self.pending_lsns.push(self.lsn); for pending_lsn in self.pending_lsns.drain(..) { @@ -1411,8 +1415,8 @@ impl<'a> DatadirModification<'a> { Ok(()) } - pub(crate) fn is_empty(&self) -> bool { - self.pending_updates.is_empty() && self.pending_deletions.is_empty() + pub(crate) fn len(&self) -> usize { + self.pending_updates.len() + self.pending_deletions.len() } // Internal helper functions to batch the modifications @@ -1461,6 +1465,12 @@ impl<'a> DatadirModification<'a> { } } +/// This struct facilitates accessing either a committed key from the timeline at a +/// specific LSN, or the latest uncommitted key from a pending modification. +/// During WAL ingestion, the records from multiple LSNs may be batched in the same +/// modification before being flushed to the timeline. Hence, the routines in WalIngest +/// need to look up the keys in the modification first before looking them up in the +/// timeline to not miss the latest updates. #[derive(Clone, Copy)] pub enum Version<'a> { Lsn(Lsn), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 35d7d5040b..89ee6b9c31 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1430,7 +1430,6 @@ impl Timeline { .tenant_conf .max_lsn_wal_lag .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag); - let ingest_batch_size = self.conf.ingest_batch_size; drop(tenant_conf_guard); let mut guard = self.walreceiver.lock().unwrap(); @@ -1446,7 +1445,7 @@ impl Timeline { max_lsn_wal_lag, auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), availability_zone: self.conf.availability_zone.clone(), - ingest_batch_size, + ingest_batch_size: self.conf.ingest_batch_size, }, broker_client, ctx, diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 88e83fd8c4..e398d683e5 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument}; use super::TaskStateUpdate; use crate::{ context::RequestContext, - metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS}, + metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST}, task_mgr, task_mgr::TaskKind, task_mgr::WALRECEIVER_RUNTIME, @@ -309,6 +309,7 @@ pub(super) async fn handle_walreceiver_connection( let mut decoded = DecodedWALRecord::default(); let mut modification = timeline.begin_modification(startlsn); let mut uncommitted_records = 0; + let mut filtered_records = 0; while let Some((lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are @@ -318,32 +319,38 @@ pub(super) async fn handle_walreceiver_connection( } // Ingest the records without immediately committing them. - walingest - .ingest_record( - recdata, - lsn, - &mut modification, - &mut decoded, - &ctx, - false, - ) + let ingested = walingest + .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) .await .with_context(|| format!("could not ingest record at {lsn}"))?; + if !ingested { + tracing::debug!("ingest: filtered out record @ LSN {lsn}"); + WAL_INGEST.records_filtered.inc(); + filtered_records += 1; + } fail_point!("walreceiver-after-ingest"); last_rec_lsn = lsn; + // Commit every ingest_batch_size records. Even if we filtered out + // all records, we still need to call commit to advance the LSN. uncommitted_records += 1; - // Commit every ingest_batch_size records. if uncommitted_records >= ingest_batch_size { + WAL_INGEST + .records_committed + .inc_by(uncommitted_records - filtered_records); modification.commit(&ctx).await?; uncommitted_records = 0; + filtered_records = 0; } } // Commit the remaining records. if uncommitted_records > 0 { + WAL_INGEST + .records_committed + .inc_by(uncommitted_records - filtered_records); modification.commit(&ctx).await?; } } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index bd8dcfad86..5b08067738 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -57,7 +57,7 @@ impl WalIngest { pub async fn new( timeline: &Timeline, startpoint: Lsn, - ctx: &'_ RequestContext, + ctx: &RequestContext, ) -> anyhow::Result { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. @@ -80,6 +80,8 @@ impl WalIngest { /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the /// relations/pages that the record affects. /// + /// This function returns `true` if the record was ingested, and `false` if it was filtered out + /// pub async fn ingest_record( &mut self, recdata: Bytes, @@ -87,10 +89,10 @@ impl WalIngest { modification: &mut DatadirModification<'_>, decoded: &mut DecodedWALRecord, ctx: &RequestContext, - commit: bool, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { WAL_INGEST.records_received.inc(); let pg_version = modification.tline.pg_version; + let prev_len = modification.len(); modification.set_lsn(lsn)?; decode_wal_record(recdata, decoded, pg_version)?; @@ -399,22 +401,11 @@ impl WalIngest { self.checkpoint_modified = false; } - if modification.is_empty() { - tracing::debug!("ingest: filtered out record @ LSN {lsn}"); - WAL_INGEST.records_filtered.inc(); - modification.tline.finish_write(lsn); - } else { - WAL_INGEST.records_committed.inc(); - modification.commit(ctx).await?; - } + // Note that at this point this record is only cached in the modification + // until commit() is called to flush the data into the repository and update + // the latest LSN. - // Now that this record has been fully handled, including updating the - // checkpoint data, let the repository know that it is up-to-date to this LSN. - if commit { - modification.commit(ctx).await?; - } - - Ok(()) + Ok(modification.len() > prev_len) } /// Do not store this block, but observe it for the purposes of updating our relation size state. @@ -2253,10 +2244,11 @@ mod tests { decoder.feed_bytes(chunk); while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() { walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx, true) + .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) .await .unwrap(); } + modification.commit(&ctx).await.unwrap(); } let duration = started_at.elapsed();