mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
wip: avoid memory allocation during WAL ingestion
This commit is contained in:
@@ -62,6 +62,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit()?;
|
||||
drop(modification);
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -269,7 +270,8 @@ fn import_wal<R: Repository>(
|
||||
let mut nrecords = 0;
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest.ingest_record(tline, recdata, lsn)?;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
walingest.ingest_record(recdata, lsn, &mut modification)?;
|
||||
last_lsn = lsn;
|
||||
|
||||
nrecords += 1;
|
||||
@@ -384,7 +386,8 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
|
||||
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest.ingest_record(tline, recdata, lsn)?;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
walingest.ingest_record(recdata, lsn, &mut modification)?;
|
||||
last_lsn = lsn;
|
||||
|
||||
debug!("imported record at {} (end {})", lsn, end_lsn);
|
||||
|
||||
@@ -99,6 +99,7 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
pending_updates: HashMap::new(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
writer: self.tline.writer(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -532,7 +533,7 @@ pub struct DatadirModification<'a, R: Repository> {
|
||||
/// in the state in 'tline' yet.
|
||||
pub tline: &'a DatadirTimeline<R>,
|
||||
|
||||
lsn: Lsn,
|
||||
pub lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
@@ -540,9 +541,17 @@ pub struct DatadirModification<'a, R: Repository> {
|
||||
pending_updates: HashMap<Key, Value>,
|
||||
pending_deletions: Vec<Range<Key>>,
|
||||
pending_nblocks: isize,
|
||||
|
||||
writer: Box<dyn TimelineWriter<'a> + 'a>,
|
||||
}
|
||||
|
||||
impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
pub fn clear(&mut self) {
|
||||
self.pending_updates.clear();
|
||||
self.pending_deletions.clear();
|
||||
self.pending_nblocks = 0;
|
||||
}
|
||||
|
||||
/// Initialize a completely new repository.
|
||||
///
|
||||
/// This inserts the directory metadata entries that are assumed to
|
||||
@@ -905,19 +914,17 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
/// Finish this atomic update, writing all the updated keys to the
|
||||
/// underlying timeline.
|
||||
///
|
||||
pub fn commit(self) -> Result<()> {
|
||||
let writer = self.tline.tline.writer();
|
||||
|
||||
pub fn commit(&self) -> Result<()> {
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
|
||||
for (key, value) in self.pending_updates {
|
||||
writer.put(key, self.lsn, value)?;
|
||||
for (key, value) in &self.pending_updates {
|
||||
self.writer.put(key.clone(), self.lsn, value.clone())?;
|
||||
}
|
||||
for key_range in self.pending_deletions {
|
||||
writer.delete(key_range.clone(), self.lsn)?;
|
||||
for key_range in &self.pending_deletions {
|
||||
self.writer.delete(key_range.clone(), self.lsn)?;
|
||||
}
|
||||
|
||||
writer.finish_write(self.lsn);
|
||||
self.writer.finish_write(self.lsn);
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
self.tline.current_logical_size.fetch_add(
|
||||
|
||||
@@ -78,12 +78,10 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
///
|
||||
pub fn ingest_record(
|
||||
&mut self,
|
||||
timeline: &DatadirTimeline<R>,
|
||||
recdata: Bytes,
|
||||
lsn: Lsn,
|
||||
modification: &mut DatadirModification<R>,
|
||||
) -> Result<()> {
|
||||
let mut modification = timeline.begin_modification(lsn);
|
||||
|
||||
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
@@ -98,7 +96,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|
||||
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
|
||||
{
|
||||
self.ingest_heapam_record(&mut buf, &mut modification, &mut decoded)?;
|
||||
self.ingest_heapam_record(&mut buf, modification, &mut decoded)?;
|
||||
}
|
||||
// Handle other special record types
|
||||
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
|
||||
@@ -106,19 +104,19 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
== pg_constants::XLOG_SMGR_CREATE
|
||||
{
|
||||
let create = XlSmgrCreate::decode(&mut buf);
|
||||
self.ingest_xlog_smgr_create(&mut modification, &create)?;
|
||||
self.ingest_xlog_smgr_create(modification, &create)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
|
||||
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
== pg_constants::XLOG_SMGR_TRUNCATE
|
||||
{
|
||||
let truncate = XlSmgrTruncate::decode(&mut buf);
|
||||
self.ingest_xlog_smgr_truncate(&mut modification, &truncate)?;
|
||||
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
== pg_constants::XLOG_DBASE_CREATE
|
||||
{
|
||||
let createdb = XlCreateDatabase::decode(&mut buf);
|
||||
self.ingest_xlog_dbase_create(&mut modification, &createdb)?;
|
||||
self.ingest_xlog_dbase_create(modification, &createdb)?;
|
||||
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
== pg_constants::XLOG_DBASE_DROP
|
||||
{
|
||||
@@ -137,7 +135,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
self.put_slru_page_image(
|
||||
&mut modification,
|
||||
modification,
|
||||
SlruKind::Clog,
|
||||
segno,
|
||||
rpageno,
|
||||
@@ -146,7 +144,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
let xlrec = XlClogTruncate::decode(&mut buf);
|
||||
self.ingest_clog_truncate_record(&mut modification, &xlrec)?;
|
||||
self.ingest_clog_truncate_record(modification, &xlrec)?;
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
@@ -154,7 +152,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let parsed_xact =
|
||||
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
||||
self.ingest_xact_record(
|
||||
&mut modification,
|
||||
modification,
|
||||
&parsed_xact,
|
||||
info == pg_constants::XLOG_XACT_COMMIT,
|
||||
)?;
|
||||
@@ -164,7 +162,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let parsed_xact =
|
||||
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
||||
self.ingest_xact_record(
|
||||
&mut modification,
|
||||
modification,
|
||||
&parsed_xact,
|
||||
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
|
||||
)?;
|
||||
@@ -187,7 +185,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
self.put_slru_page_image(
|
||||
&mut modification,
|
||||
modification,
|
||||
SlruKind::MultiXactOffsets,
|
||||
segno,
|
||||
rpageno,
|
||||
@@ -198,7 +196,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
self.put_slru_page_image(
|
||||
&mut modification,
|
||||
modification,
|
||||
SlruKind::MultiXactMembers,
|
||||
segno,
|
||||
rpageno,
|
||||
@@ -206,14 +204,14 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
)?;
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||
self.ingest_multixact_create_record(&mut modification, &xlrec)?;
|
||||
self.ingest_multixact_create_record(modification, &xlrec)?;
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||
self.ingest_multixact_truncate_record(&mut modification, &xlrec)?;
|
||||
self.ingest_multixact_truncate_record(modification, &xlrec)?;
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
self.ingest_relmap_page(&mut modification, &xlrec, &decoded)?;
|
||||
self.ingest_relmap_page(modification, &xlrec, &decoded)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_NEXTOID {
|
||||
@@ -248,7 +246,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
// Iterate through all the blocks that the record modifies, and
|
||||
// "put" a separate copy of the record for each block.
|
||||
for blk in decoded.blocks.iter() {
|
||||
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
|
||||
self.ingest_decoded_block(modification, lsn, &decoded, blk)?;
|
||||
}
|
||||
|
||||
// If checkpoint data was updated, store the new version in the repository
|
||||
|
||||
@@ -150,20 +150,34 @@ pub async fn handle_walreceiver_connection(
|
||||
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
// let mut n_records = 0;
|
||||
// timer = std::time::Instant::now();
|
||||
{
|
||||
let mut modification = timeline.begin_modification(last_rec_lsn);
|
||||
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
ensure!(lsn.is_aligned());
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
|
||||
walingest.ingest_record(&timeline, recdata, lsn)?;
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
modification.clear();
|
||||
modification.lsn = lsn;
|
||||
walingest.ingest_record(recdata, lsn, &mut modification)?;
|
||||
|
||||
last_rec_lsn = lsn;
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = lsn;
|
||||
|
||||
// n_records += 1;
|
||||
}
|
||||
}
|
||||
// info!(
|
||||
// "Processing {n_records} records took {}us",
|
||||
// timer.elapsed().as_micros()
|
||||
// );
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
info!("caught up at LSN {endlsn}");
|
||||
|
||||
Reference in New Issue
Block a user