Compare commits

...

20 Commits

Author SHA1 Message Date
Thang Pham
0962131f6c test 2022-06-30 12:35:33 -04:00
Thang Pham
45fe1ecac9 test 2022-06-30 12:35:32 -04:00
Thang Pham
aa6f6acae9 test 2022-06-30 12:35:31 -04:00
Thang Pham
ebfa1d8109 test 2022-06-30 12:35:30 -04:00
Thang Pham
9ec7be082a test 2022-06-30 12:35:29 -04:00
Thang Pham
00404c8210 update test 2022-06-30 12:32:22 -04:00
Thang Pham
e44655b265 add error context to ingest_record 2022-06-30 10:21:19 -04:00
Thang Pham
cf96e484f3 Merge remote-tracking branch 'origin/main' into thang/reduce-mem-alloc 2022-06-29 20:30:45 -04:00
Thang Pham
2eeb80b071 move decoded and modification to be inside the loop 2022-06-29 17:05:49 -04:00
Thang Pham
0e2462e5f2 cleanup 2022-06-29 15:57:45 -04:00
Thang Pham
42249823aa avoid additional serialize call 2022-06-29 15:23:27 -04:00
Thang Pham
63b3195a31 add a serialize buffer for InMemoryLayer 2022-06-29 14:52:16 -04:00
Thang Pham
4c4e9200e0 update test 2022-06-29 11:00:10 -04:00
Thang Pham
6d64fa77f7 avoid coupling DatadirModification with lsn 2022-06-29 10:57:01 -04:00
Thang Pham
6571752508 fix test 2022-06-28 17:14:44 -04:00
Thang Pham
9cf186a9cb cleanup 2022-06-28 16:56:41 -04:00
Thang Pham
b71bd90a37 update tests 2022-06-28 16:53:21 -04:00
Thang Pham
b5d9e5b06f reuse previous decoded wal record when decoding a new one 2022-06-28 16:40:48 -04:00
Thang Pham
9bc2287bcf update test 2022-06-28 15:10:48 -04:00
Thang Pham
eb424f851b wip: avoid memory allocation during WAL ingestion 2022-06-28 14:58:44 -04:00
8 changed files with 178 additions and 106 deletions

View File

@@ -16,6 +16,7 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
@@ -38,7 +39,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification(lsn);
let mut modification = tline.begin_modification();
modification.init_empty()?;
// Import all but pg_wal
@@ -61,7 +62,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
}
// We're done importing all the data files.
modification.commit()?;
modification.commit(lsn)?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -267,9 +268,11 @@ fn import_wal<R: Repository>(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(tline, recdata, lsn)?;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;
nrecords += 1;
@@ -299,7 +302,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification(base_lsn);
let mut modification = tline.begin_modification();
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
@@ -330,7 +333,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit(base_lsn)?;
Ok(())
}
@@ -382,9 +385,11 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(tline, recdata, lsn)?;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);

View File

@@ -660,11 +660,21 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
}
pub fn put_value_bytes(
&mut self,
key: Key,
lsn: Lsn,
val: &[u8],
will_init: bool,
) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let off = self.blob_writer.write_blob(val)?;
let blob_ref = BlobRef::new(off, val.will_init());
let blob_ref = BlobRef::new(off, will_init);
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;

View File

@@ -28,7 +28,7 @@ use utils::{
use std::fmt::Write as _;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::RwLock;
use std::sync::{Mutex, RwLock};
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -41,6 +41,10 @@ pub struct InMemoryLayer {
///
start_lsn: Lsn,
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
/// This buffer is reused for each serialization to avoid additional malloc calls.
ser_buffer: Mutex<Vec<u8>>,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: RwLock<InMemoryLayerInner>,
@@ -255,6 +259,7 @@ impl InMemoryLayer {
timelineid,
tenantid,
start_lsn,
ser_buffer: Mutex::new(Vec::new()),
inner: RwLock::new(InMemoryLayerInner {
end_lsn: None,
index: HashMap::new(),
@@ -270,10 +275,15 @@ impl InMemoryLayer {
pub fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timelineid, lsn);
let mut inner = self.inner.write().unwrap();
inner.assert_writeable();
let off = inner.file.write_blob(&Value::ser(&val)?)?;
let off = {
let mut buf = self.ser_buffer.lock().unwrap();
val.ser_into(&mut (*buf))?;
let off = inner.file.write_blob(&buf)?;
buf.clear();
off
};
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
@@ -342,8 +352,8 @@ impl InMemoryLayer {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
let val = Value::des(&buf)?;
delta_layer_writer.put_value(key, *lsn, val)?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
}
}

View File

@@ -79,23 +79,25 @@ impl<R: Repository> DatadirTimeline<R> {
/// the timeline.
///
/// This provides a transaction-like interface to perform a bunch
/// of modifications atomically, all stamped with one LSN.
/// of modifications atomically.
///
/// To ingest a WAL record, call begin_modification(lsn) to get a
/// To ingest a WAL record, call begin_modification() to get a
/// DatadirModification object. Use the functions in the object to
/// modify the repository state, updating all the pages and metadata
/// that the WAL record affects. When you're done, call commit() to
/// commit the changes.
/// that the WAL record affects. When you're done, call commit(lsn) to
/// commit the changes. All the changes will be stamped with the specified LSN.
///
/// Calling commit(lsn) will flush all the changes and reset the state,
/// so the `DatadirModification` struct can be reused to perform the next modification.
///
/// Note that any pending modifications you make through the
/// modification object won't be visible to calls to the 'get' and list
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification<R> {
pub fn begin_modification(&self) -> DatadirModification<R> {
DatadirModification {
tline: self,
lsn,
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
@@ -532,8 +534,6 @@ pub struct DatadirModification<'a, R: Repository> {
/// in the state in 'tline' yet.
pub tline: &'a DatadirTimeline<R>,
lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
@@ -904,20 +904,22 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
///
/// Finish this atomic update, writing all the updated keys to the
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(self) -> Result<()> {
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
let writer = self.tline.tline.writer();
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates {
writer.put(key, self.lsn, value)?;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, value)?;
}
for key_range in self.pending_deletions {
writer.delete(key_range.clone(), self.lsn)?;
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
}
writer.finish_write(self.lsn);
writer.finish_write(lsn);
if pending_nblocks != 0 {
self.tline.current_logical_size.fetch_add(
@@ -1345,9 +1347,9 @@ pub fn create_test_timeline<R: Repository>(
) -> Result<Arc<crate::DatadirTimeline<R>>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = DatadirTimeline::new(tline, 256 * 1024);
let mut m = tline.begin_modification(Lsn(8));
let mut m = tline.begin_modification();
m.init_empty()?;
m.commit()?;
m.commit(Lsn(8))?;
Ok(Arc::new(tline))
}

View File

@@ -78,13 +78,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
///
pub fn ingest_record(
&mut self,
timeline: &DatadirTimeline<R>,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<R>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
let mut modification = timeline.begin_modification(lsn);
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -98,7 +98,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
self.ingest_heapam_record(&mut buf, &mut modification, &mut decoded)?;
self.ingest_heapam_record(&mut buf, modification, decoded)?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
@@ -106,19 +106,19 @@ impl<'a, R: Repository> WalIngest<'a, R> {
== pg_constants::XLOG_SMGR_CREATE
{
let create = XlSmgrCreate::decode(&mut buf);
self.ingest_xlog_smgr_create(&mut modification, &create)?;
self.ingest_xlog_smgr_create(modification, &create)?;
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(&mut modification, &truncate)?;
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(&mut modification, &createdb)?;
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
@@ -137,7 +137,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
&mut modification,
modification,
SlruKind::Clog,
segno,
rpageno,
@@ -146,7 +146,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(&mut modification, &xlrec)?;
self.ingest_clog_truncate_record(modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
@@ -154,7 +154,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
&mut modification,
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
)?;
@@ -164,7 +164,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
&mut modification,
modification,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
)?;
@@ -187,7 +187,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
&mut modification,
modification,
SlruKind::MultiXactOffsets,
segno,
rpageno,
@@ -198,7 +198,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
self.put_slru_page_image(
&mut modification,
modification,
SlruKind::MultiXactMembers,
segno,
rpageno,
@@ -206,14 +206,14 @@ impl<'a, R: Repository> WalIngest<'a, R> {
)?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(&mut modification, &xlrec)?;
self.ingest_multixact_create_record(modification, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(&mut modification, &xlrec)?;
self.ingest_multixact_truncate_record(modification, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(&mut modification, &xlrec, &decoded)?;
self.ingest_relmap_page(modification, &xlrec, decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -248,7 +248,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
self.ingest_decoded_block(modification, lsn, decoded, blk)?;
}
// If checkpoint data was updated, store the new version in the repository
@@ -261,7 +261,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit(lsn)?;
Ok(())
}
@@ -1069,10 +1069,10 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
let mut m = tline.begin_modification(Lsn(0x10));
let mut m = tline.begin_modification();
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
m.commit()?;
m.commit(Lsn(0x10))?;
let walingest = WalIngest::new(tline, Lsn(0x10))?;
Ok(walingest)
@@ -1084,19 +1084,19 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
walingest.put_rel_creation(&mut m, TESTREL_A)?;
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x30));
m.commit(Lsn(0x20))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x40));
m.commit(Lsn(0x30))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x50));
m.commit(Lsn(0x40))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
m.commit()?;
m.commit(Lsn(0x50))?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1142,9 +1142,9 @@ mod tests {
);
// Truncate last block
let mut m = tline.begin_modification(Lsn(0x60));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
m.commit()?;
m.commit(Lsn(0x60))?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1166,15 +1166,15 @@ mod tests {
);
// Truncate to zero length
let mut m = tline.begin_modification(Lsn(0x68));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
m.commit()?;
m.commit(Lsn(0x68))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
// Extend from 0 to 2 blocks, leaving a gap
let mut m = tline.begin_modification(Lsn(0x70));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
m.commit()?;
m.commit(Lsn(0x70))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
assert_eq!(
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
@@ -1186,9 +1186,9 @@ mod tests {
);
// Extend a lot more, leaving a big gap that spans across segments
let mut m = tline.begin_modification(Lsn(0x80));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
m.commit()?;
m.commit(Lsn(0x80))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
for blk in 2..1500 {
assert_eq!(
@@ -1212,18 +1212,18 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit()?;
m.commit(Lsn(0x20))?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
let mut m = tline.begin_modification();
walingest.put_rel_drop(&mut m, TESTREL_A)?;
m.commit()?;
m.commit(Lsn(0x30))?;
// Check that rel is not visible anymore
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
@@ -1232,9 +1232,9 @@ mod tests {
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
// Re-create it
let mut m = tline.begin_modification(Lsn(0x40));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?;
m.commit()?;
m.commit(Lsn(0x40))?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
@@ -1254,12 +1254,12 @@ mod tests {
// Create a 20 MB relation (the size is arbitrary)
let relsize = 20 * 1024 * 1024 / 8192;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit()?;
m.commit(Lsn(0x20))?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
@@ -1280,9 +1280,9 @@ mod tests {
// Truncate relation so that second segment was dropped
// - only leave one page
let mut m = tline.begin_modification(Lsn(0x60));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
m.commit()?;
m.commit(Lsn(0x60))?;
// Check reported size and contents after truncation
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
@@ -1310,12 +1310,12 @@ mod tests {
// Extend relation again.
// Add enough blocks to create second segment
let lsn = Lsn(0x80);
let mut m = tline.begin_modification(lsn);
let mut m = tline.begin_modification();
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, lsn);
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit()?;
m.commit(lsn)?;
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
@@ -1343,10 +1343,10 @@ mod tests {
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?;
m.commit()?;
m.commit(Lsn(lsn))?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1358,9 +1358,9 @@ mod tests {
// Truncate one block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
m.commit()?;
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
@@ -1369,9 +1369,9 @@ mod tests {
// Truncate another block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
m.commit()?;
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
@@ -1383,9 +1383,9 @@ mod tests {
let mut size: i32 = 3000;
while size >= 0 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
m.commit()?;
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
size as BlockNumber

View File

@@ -23,6 +23,7 @@ use crate::{
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
@@ -150,19 +151,25 @@ pub async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let _enter = info_span!("processing record", lsn = %lsn).entered();
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest.ingest_record(&timeline, recdata, lsn)?;
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
.context("could not ingest record at {lsn}")?;
fail_point!("walreceiver-after-ingest");
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
last_rec_lsn = lsn;
}
}
if !caught_up && endlsn >= end_of_wal {

View File

@@ -96,6 +96,7 @@ impl DecodedBkpBlock {
}
}
#[derive(Default)]
pub struct DecodedWALRecord {
pub xl_xid: TransactionId,
pub xl_info: u8,
@@ -505,7 +506,10 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeError> {
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
) -> Result<(), DeserializeError> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -534,7 +538,9 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
let mut blocks_total_len: u32 = 0;
let mut main_data_len = 0;
let mut datatotal: u32 = 0;
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
if !decoded.blocks.is_empty() {
decoded.blocks.clear();
}
// 2. Decode the headers.
// XLogRecordBlockHeaders if any,
@@ -713,7 +719,7 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
blk.blkno
);
blocks.push(blk);
decoded.blocks.push(blk);
}
_ => {
@@ -724,7 +730,7 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
// 3. Decode blocks.
let mut ptr = record.len() - buf.remaining();
for blk in blocks.iter_mut() {
for blk in decoded.blocks.iter_mut() {
if blk.has_image {
blk.bimg_offset = ptr as u32;
ptr += blk.bimg_len as usize;
@@ -744,14 +750,13 @@ pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeE
assert_eq!(buf.remaining(), main_data_len as usize);
}
Ok(DecodedWALRecord {
xl_xid: xlogrec.xl_xid,
xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid,
record,
blocks,
main_data_offset,
})
decoded.xl_xid = xlogrec.xl_xid;
decoded.xl_info = xlogrec.xl_info;
decoded.xl_rmid = xlogrec.xl_rmid;
decoded.record = record;
decoded.main_data_offset = main_data_offset;
Ok(())
}
///

View File

@@ -1,4 +1,6 @@
import os
import threading
import time
from typing import List
import pytest
@@ -99,3 +101,34 @@ def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare
env.pg_bin.run_capture(
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
env.flush()
@pytest.mark.parametrize("n_tables", [1, 10])
@pytest.mark.parametrize("duration", get_durations_matrix(10))
def test_compare_pg_stats_wo_with_heavy_write(neon_with_baseline: PgCompare,
n_tables: int,
duration: int,
pg_stats_wo: List[PgStatTable]):
env = neon_with_baseline
with env.pg.connect().cursor() as cur:
for i in range(n_tables):
cur.execute(
f"CREATE TABLE t{i}(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
)
def start_single_table_workload(table_id: int):
start = time.time()
with env.pg.connect().cursor() as cur:
while time.time() - start < duration:
cur.execute(f"INSERT INTO t{table_id} SELECT FROM generate_series(1,1000)")
with env.record_pg_stats(pg_stats_wo):
threads = [
threading.Thread(target=start_single_table_workload, args=(i, ))
for i in range(n_tables)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()