Reduce memory allocations for page server (#2010)

## Overview

This patch reduces the number of memory allocations when running the page server under a heavy write workload. This mostly helps improve the speed of WAL record ingestion. 

## Changes
- modified `DatadirModification` to allow reuse the struct's allocated memory after each modification
- modified `decode_wal_record` to allow passing a `DecodedWALRecord` reference. This helps reuse the struct in each `decode_wal_record` call
- added a reusable buffer for serializing object inside the `InMemoryLayer::put_value` function
- added a performance test simulating a heavy write workload for testing the changes in this patch

### Semi-related changes
- remove redundant serializations when calling `DeltaLayer::put_value` during `InMemoryLayer::write_to_disk` function call [1]
- removed the info span `info_span!("processing record", lsn = %lsn)` during each WAL ingestion [2]

## Notes
- [1]: in `InMemoryLayer::write_to_disk`, a deserialization is called
  ```
  let val = Value::des(&buf)?;
  delta_layer_writer.put_value(key, *lsn, val)?;
  ``` 
  `DeltaLayer::put_value` then creates a serialization based on the previous deserialization
  ```
  let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
  ```
- [2]: related: https://github.com/neondatabase/neon/issues/733
This commit is contained in:
Thang Pham
2022-07-21 12:08:26 -04:00
committed by GitHub
parent 572ae74388
commit ed102f44d9
9 changed files with 196 additions and 109 deletions

View File

@@ -16,6 +16,7 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
@@ -38,7 +39,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification(lsn);
let mut modification = tline.begin_modification();
modification.init_empty()?;
// Import all but pg_wal
@@ -57,12 +58,12 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
pg_control = Some(control_file);
}
modification.flush()?;
modification.flush(lsn)?;
}
}
// We're done importing all the data files.
modification.commit()?;
modification.commit(lsn)?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -268,9 +269,11 @@ fn import_wal<R: Repository>(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(tline, recdata, lsn)?;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;
nrecords += 1;
@@ -300,7 +303,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification(base_lsn);
let mut modification = tline.begin_modification();
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
@@ -318,7 +321,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush()?;
modification.flush(base_lsn)?;
}
tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -332,7 +335,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit(base_lsn)?;
Ok(())
}
@@ -384,9 +387,11 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(tline, recdata, lsn)?;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);

View File

@@ -672,11 +672,21 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
}
pub fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
val: &[u8],
will_init: bool,
) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let off = self.blob_writer.write_blob(val)?;
let blob_ref = BlobRef::new(off, val.will_init());
let blob_ref = BlobRef::new(off, will_init);
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;

View File

@@ -15,6 +15,7 @@ use crate::layered_repository::storage_layer::{
use crate::repository::{Key, Value};
use crate::walrecord;
use anyhow::{bail, ensure, Result};
use std::cell::RefCell;
use std::collections::HashMap;
use tracing::*;
use utils::{
@@ -30,6 +31,12 @@ use std::ops::Range;
use std::path::PathBuf;
use std::sync::RwLock;
thread_local! {
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
/// This buffer is reused for each serialization to avoid additional malloc calls.
static SER_BUFFER: RefCell<Vec<u8>> = RefCell::new(Vec::new());
}
pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
@@ -278,10 +285,17 @@ impl InMemoryLayer {
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timelineid, lsn);
let mut inner = self.inner.write().unwrap();
inner.assert_writeable();
let off = inner.file.write_blob(&Value::ser(val)?)?;
let off = {
SER_BUFFER.with(|x| -> Result<_> {
let mut buf = x.borrow_mut();
buf.clear();
val.ser_into(&mut (*buf))?;
let off = inner.file.write_blob(&buf)?;
Ok(off)
})?
};
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
@@ -350,8 +364,8 @@ impl InMemoryLayer {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
let val = Value::des(&buf)?;
delta_layer_writer.put_value(key, *lsn, val)?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
}
}

View File

@@ -80,23 +80,25 @@ impl<R: Repository> DatadirTimeline<R> {
/// the timeline.
///
/// This provides a transaction-like interface to perform a bunch
/// of modifications atomically, all stamped with one LSN.
/// of modifications atomically.
///
/// To ingest a WAL record, call begin_modification(lsn) to get a
/// To ingest a WAL record, call begin_modification() to get a
/// DatadirModification object. Use the functions in the object to
/// modify the repository state, updating all the pages and metadata
/// that the WAL record affects. When you're done, call commit() to
/// commit the changes.
/// that the WAL record affects. When you're done, call commit(lsn) to
/// commit the changes. All the changes will be stamped with the specified LSN.
///
/// Calling commit(lsn) will flush all the changes and reset the state,
/// so the `DatadirModification` struct can be reused to perform the next modification.
///
/// Note that any pending modifications you make through the
/// modification object won't be visible to calls to the 'get' and list
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification<R> {
pub fn begin_modification(&self) -> DatadirModification<R> {
DatadirModification {
tline: self,
lsn,
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
@@ -533,8 +535,6 @@ pub struct DatadirModification<'a, R: Repository> {
/// in the state in 'tline' yet.
pub tline: &'a DatadirTimeline<R>,
lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
@@ -920,7 +920,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self) -> Result<()> {
pub fn flush(&mut self, lsn: Lsn) -> Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -934,7 +934,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
let mut result: Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, self.lsn, value);
result = writer.put(key, lsn, value);
false
} else {
true
@@ -956,20 +956,22 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
///
/// Finish this atomic update, writing all the updated keys to the
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(self) -> Result<()> {
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
let writer = self.tline.tline.writer();
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates {
writer.put(key, self.lsn, &value)?;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value)?;
}
for key_range in self.pending_deletions {
writer.delete(key_range.clone(), self.lsn)?;
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
}
writer.finish_write(self.lsn);
writer.finish_write(lsn);
if pending_nblocks != 0 {
self.tline.current_logical_size.fetch_add(
@@ -1407,9 +1409,9 @@ pub fn create_test_timeline<R: Repository>(
) -> Result<Arc<crate::DatadirTimeline<R>>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = DatadirTimeline::new(tline, 256 * 1024);
let mut m = tline.begin_modification(Lsn(8));
let mut m = tline.begin_modification();
m.init_empty()?;
m.commit()?;
m.commit(Lsn(8))?;
Ok(Arc::new(tline))
}

View File

@@ -78,13 +78,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
///
pub fn ingest_record(
&mut self,
timeline: &DatadirTimeline<R>,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<R>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
let mut modification = timeline.begin_modification(lsn);
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -98,7 +98,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
self.ingest_heapam_record(&mut buf, &mut modification, &mut decoded)?;
self.ingest_heapam_record(&mut buf, modification, decoded)?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
@@ -106,19 +106,19 @@ impl<'a, R: Repository> WalIngest<'a, R> {
== pg_constants::XLOG_SMGR_CREATE
{
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(&mut modification, &create)?;
self.ingest_xlog_smgr_create(modification, &create)?;
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(&mut modification, &truncate)?;
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(&mut modification, &createdb)?;
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
@@ -137,7 +137,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
&mut modification,
modification,
SlruKind::Clog,
segno,
rpageno,
@@ -146,7 +146,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(&mut modification, &xlrec)?;
self.ingest_clog_truncate_record(modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
@@ -154,7 +154,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
&mut modification,
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
)?;
@@ -164,7 +164,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
&mut modification,
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
)?;
@@ -187,7 +187,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
&mut modification,
modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
@@ -198,7 +198,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
&mut modification,
modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
@@ -206,14 +206,14 @@ impl<'a, R: Repository> WalIngest<'a, R> {
)?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(&mut modification, &xlrec)?;
self.ingest_multixact_create_record(modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(&mut modification, &xlrec)?;
self.ingest_multixact_truncate_record(modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(&mut modification, &xlrec, &decoded)?;
self.ingest_relmap_page(modification, &xlrec, decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -248,7 +248,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
self.ingest_decoded_block(modification, lsn, decoded, blk)?;
}
// If checkpoint data was updated, store the new version in the repository
@@ -261,7 +261,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit(lsn)?;
Ok(())
}
@@ -1069,10 +1069,10 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
let mut m = tline.begin_modification(Lsn(0x10));
let mut m = tline.begin_modification();
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
m.commit()?;
m.commit(Lsn(0x10))?;
let walingest = WalIngest::new(tline, Lsn(0x10))?;
Ok(walingest)
@@ -1084,19 +1084,19 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
walingest.put_rel_creation(&mut m, TESTREL_A)?;
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x30));
m.commit(Lsn(0x20))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x40));
m.commit(Lsn(0x30))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x50));
m.commit(Lsn(0x40))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
m.commit()?;
m.commit(Lsn(0x50))?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1142,9 +1142,9 @@ mod tests {
);
// Truncate last block
let mut m = tline.begin_modification(Lsn(0x60));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
m.commit()?;
m.commit(Lsn(0x60))?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1166,15 +1166,15 @@ mod tests {
);
// Truncate to zero length
let mut m = tline.begin_modification(Lsn(0x68));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
m.commit()?;
m.commit(Lsn(0x68))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
// Extend from 0 to 2 blocks, leaving a gap
let mut m = tline.begin_modification(Lsn(0x70));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
m.commit()?;
m.commit(Lsn(0x70))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
assert_eq!(
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
@@ -1186,9 +1186,9 @@ mod tests {
);
// Extend a lot more, leaving a big gap that spans across segments
let mut m = tline.begin_modification(Lsn(0x80));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
m.commit()?;
m.commit(Lsn(0x80))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
for blk in 2..1500 {
assert_eq!(
@@ -1212,18 +1212,18 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit()?;
m.commit(Lsn(0x20))?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
let mut m = tline.begin_modification();
walingest.put_rel_drop(&mut m, TESTREL_A)?;
m.commit()?;
m.commit(Lsn(0x30))?;
// Check that rel is not visible anymore
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
@@ -1232,9 +1232,9 @@ mod tests {
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
// Re-create it
let mut m = tline.begin_modification(Lsn(0x40));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?;
m.commit()?;
m.commit(Lsn(0x40))?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
@@ -1254,12 +1254,12 @@ mod tests {
// Create a 20 MB relation (the size is arbitrary)
let relsize = 20 * 1024 * 1024 / 8192;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit()?;
m.commit(Lsn(0x20))?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
@@ -1280,9 +1280,9 @@ mod tests {
// Truncate relation so that second segment was dropped
// - only leave one page
let mut m = tline.begin_modification(Lsn(0x60));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
m.commit()?;
m.commit(Lsn(0x60))?;
// Check reported size and contents after truncation
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
@@ -1310,12 +1310,12 @@ mod tests {
// Extend relation again.
// Add enough blocks to create second segment
let lsn = Lsn(0x80);
let mut m = tline.begin_modification(lsn);
let mut m = tline.begin_modification();
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, lsn);
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit()?;
m.commit(lsn)?;
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
@@ -1343,10 +1343,10 @@ mod tests {
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?;
m.commit()?;
m.commit(Lsn(lsn))?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1358,9 +1358,9 @@ mod tests {
// Truncate one block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
m.commit()?;
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
@@ -1369,9 +1369,9 @@ mod tests {
// Truncate another block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
m.commit()?;
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
@@ -1383,9 +1383,9 @@ mod tests {
let mut size: i32 = 3000;
while size >= 0 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
m.commit()?;
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
size as BlockNumber

View File

@@ -23,6 +23,7 @@ use crate::{
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
@@ -150,19 +151,25 @@ pub async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let _enter = info_span!("processing record", lsn = %lsn).entered();
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest.ingest_record(&timeline, recdata, lsn)?;
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.context("could not ingest record at {lsn}")?;
fail_point!("walreceiver-after-ingest");
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
last_rec_lsn = lsn;
}
}
if !caught_up && endlsn >= end_of_wal {

View File

@@ -96,6 +96,7 @@ impl DecodedBkpBlock {
}
}
#[derive(Default)]
pub struct DecodedWALRecord {
pub xl_xid: TransactionId,
pub xl_info: u8,
@@ -505,7 +506,17 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeError> {
//
//
// For performance reasons, the caller provides the DecodedWALRecord struct and the function just fills it in.
// It would be more natural for this function to return a DecodedWALRecord as return value,
// but reusing the caller-supplied struct avoids an allocation.
// This code is in the hot path for digesting incoming WAL, and is very performance sensitive.
//
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
) -> Result<(), DeserializeError> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -534,7 +545,7 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
let mut blocks_total_len: u32 = 0;
let mut main_data_len = 0;
let mut datatotal: u32 = 0;
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
decoded.blocks.clear();
// 2. Decode the headers.
// XLogRecordBlockHeaders if any,
@@ -713,7 +724,7 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
blk.blkno
);
blocks.push(blk);
decoded.blocks.push(blk);
}
_ => {
@@ -724,7 +735,7 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in blocks.iter_mut() {
for blk in decoded.blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
@@ -744,14 +755,13 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
assert_eq!(buf.remaining(), main_data_len as usize);
}
Ok(DecodedWALRecord {
xl_xid: xlogrec.xl_xid,
xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid,
record,
blocks,
main_data_offset,
})
decoded.xl_xid = xlogrec.xl_xid;
decoded.xl_info = xlogrec.xl_info;
decoded.xl_rmid = xlogrec.xl_rmid;
decoded.record = record;
decoded.main_data_offset = main_data_offset;
Ok(())
}
///

View File

@@ -1,5 +1,6 @@
import threading
import pytest
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import lsn_from_hex
@@ -162,6 +163,11 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
thread = threading.Thread(target=do_gc, daemon=True)
thread.start()
# because of network latency and other factors, GC iteration might be processed
# after the `create_branch` request. Add a sleep here to make sure that GC is
# always processed before.
time.sleep(1.0)
# The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC.
with pytest.raises(Exception, match="invalid branch start lsn"):
env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn)

View File

@@ -1,4 +1,6 @@
import os
import threading
import time
from typing import List
import pytest
@@ -87,3 +89,34 @@ def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare
env.pg_bin.run_capture(
['pgbench', f'-T{duration}', f'--random-seed={seed}', env.pg.connstr()])
env.flush()
@pytest.mark.parametrize("n_tables", [1, 10])
@pytest.mark.parametrize("duration", get_durations_matrix(10))
def test_compare_pg_stats_wo_with_heavy_write(neon_with_baseline: PgCompare,
n_tables: int,
duration: int,
pg_stats_wo: List[PgStatTable]):
env = neon_with_baseline
with env.pg.connect().cursor() as cur:
for i in range(n_tables):
cur.execute(
f"CREATE TABLE t{i}(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
)
def start_single_table_workload(table_id: int):
start = time.time()
with env.pg.connect().cursor() as cur:
while time.time() - start < duration:
cur.execute(f"INSERT INTO t{table_id} SELECT FROM generate_series(1,1000)")
with env.record_pg_stats(pg_stats_wo):
threads = [
threading.Thread(target=start_single_table_workload, args=(i, ))
for i in range(n_tables)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()