Remove committing mode in ingest_record

This commit is contained in:
Cuong Nguyen
2023-11-30 16:28:24 -05:00
parent 30d75bfb35
commit da2f4879bf
5 changed files with 50 additions and 40 deletions

View File

@@ -317,8 +317,9 @@ async fn import_wal(
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
modification.commit(ctx).await?;
last_lsn = lsn;
nrecords += 1;
@@ -453,8 +454,9 @@ pub async fn import_wal_from_tar(
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.await?;
modification.commit(ctx).await?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);

View File

@@ -1389,11 +1389,15 @@ impl<'a> DatadirModification<'a> {
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
writer.put_batch(&self.pending_updates, ctx).await?;
self.pending_updates.clear();
if !self.pending_updates.is_empty() {
writer.put_batch(&self.pending_updates, ctx).await?;
self.pending_updates.clear();
}
writer.delete_batch(&self.pending_deletions).await?;
self.pending_deletions.clear();
if !self.pending_deletions.is_empty() {
writer.delete_batch(&self.pending_deletions).await?;
self.pending_deletions.clear();
}
self.pending_lsns.push(self.lsn);
for pending_lsn in self.pending_lsns.drain(..) {
@@ -1411,8 +1415,8 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub(crate) fn is_empty(&self) -> bool {
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
pub(crate) fn len(&self) -> usize {
self.pending_updates.len() + self.pending_deletions.len()
}
// Internal helper functions to batch the modifications
@@ -1461,6 +1465,12 @@ impl<'a> DatadirModification<'a> {
}
}
/// This struct facilitates accessing either a committed key from the timeline at a
/// specific LSN, or the latest uncommitted key from a pending modification.
/// During WAL ingestion, the records from multiple LSNs may be batched in the same
/// modification before being flushed to the timeline. Hence, the routines in WalIngest
/// need to look up the keys in the modification first before looking them up in the
/// timeline to not miss the latest updates.
#[derive(Clone, Copy)]
pub enum Version<'a> {
Lsn(Lsn),

View File

@@ -1430,7 +1430,6 @@ impl Timeline {
.tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
let ingest_batch_size = self.conf.ingest_batch_size;
drop(tenant_conf_guard);
let mut guard = self.walreceiver.lock().unwrap();
@@ -1446,7 +1445,7 @@ impl Timeline {
max_lsn_wal_lag,
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size,
ingest_batch_size: self.conf.ingest_batch_size,
},
broker_client,
ctx,

View File

@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS},
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
task_mgr,
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
@@ -309,6 +309,7 @@ pub(super) async fn handle_walreceiver_connection(
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(startlsn);
let mut uncommitted_records = 0;
let mut filtered_records = 0;
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
@@ -318,32 +319,38 @@ pub(super) async fn handle_walreceiver_connection(
}
// Ingest the records without immediately committing them.
walingest
.ingest_record(
recdata,
lsn,
&mut modification,
&mut decoded,
&ctx,
false,
)
let ingested = walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
WAL_INGEST.records_filtered.inc();
filtered_records += 1;
}
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
// Commit every ingest_batch_size records. Even if we filtered out
// all records, we still need to call commit to advance the LSN.
uncommitted_records += 1;
// Commit every ingest_batch_size records.
if uncommitted_records >= ingest_batch_size {
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
uncommitted_records = 0;
filtered_records = 0;
}
}
// Commit the remaining records.
if uncommitted_records > 0 {
WAL_INGEST
.records_committed
.inc_by(uncommitted_records - filtered_records);
modification.commit(&ctx).await?;
}
}

View File

@@ -57,7 +57,7 @@ impl WalIngest {
pub async fn new(
timeline: &Timeline,
startpoint: Lsn,
ctx: &'_ RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
@@ -80,6 +80,8 @@ impl WalIngest {
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
/// relations/pages that the record affects.
///
/// This function returns `true` if the record was ingested, and `false` if it was filtered out
///
pub async fn ingest_record(
&mut self,
recdata: Bytes,
@@ -87,10 +89,10 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
commit: bool,
) -> anyhow::Result<()> {
) -> anyhow::Result<bool> {
WAL_INGEST.records_received.inc();
let pg_version = modification.tline.pg_version;
let prev_len = modification.len();
modification.set_lsn(lsn)?;
decode_wal_record(recdata, decoded, pg_version)?;
@@ -399,22 +401,11 @@ impl WalIngest {
self.checkpoint_modified = false;
}
if modification.is_empty() {
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
WAL_INGEST.records_filtered.inc();
modification.tline.finish_write(lsn);
} else {
WAL_INGEST.records_committed.inc();
modification.commit(ctx).await?;
}
// Note that at this point this record is only cached in the modification
// until commit() is called to flush the data into the repository and update
// the latest LSN.
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN.
if commit {
modification.commit(ctx).await?;
}
Ok(())
Ok(modification.len() > prev_len)
}
/// Do not store this block, but observe it for the purposes of updating our relation size state.
@@ -2253,10 +2244,11 @@ mod tests {
decoder.feed_bytes(chunk);
while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx, true)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.unwrap();
}
modification.commit(&ctx).await.unwrap();
}
let duration = started_at.elapsed();