avoid coupling DatadirModification with lsn

This commit is contained in:
Thang Pham
2022-06-29 10:57:01 -04:00
parent 6571752508
commit 6d64fa77f7
5 changed files with 62 additions and 87 deletions

View File

@@ -39,7 +39,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification(lsn);
let mut modification = tline.begin_modification();
modification.init_empty()?;
// Import all but pg_wal
@@ -62,7 +62,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
}
// We're done importing all the data files.
modification.commit()?;
modification.commit(lsn)?;
drop(modification);
// We expect the Postgres server to be shut down cleanly.
@@ -269,11 +269,10 @@ fn import_wal<R: Repository>(
waldecoder.feed_bytes(&buf);
let mut nrecords = 0;
let mut modification = tline.begin_modification(last_lsn);
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
modification.lsn = lsn;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;
@@ -304,7 +303,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification(base_lsn);
let mut modification = tline.begin_modification();
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
@@ -335,7 +334,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit(base_lsn)?;
Ok(())
}
@@ -387,11 +386,10 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
waldecoder.feed_bytes(&bytes[offset..]);
let mut modification = tline.begin_modification(last_lsn);
let mut modification = tline.begin_modification();
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
modification.lsn = lsn;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
last_lsn = lsn;

View File

@@ -79,27 +79,25 @@ impl<R: Repository> DatadirTimeline<R> {
/// the timeline.
///
/// This provides a transaction-like interface to perform a bunch
/// of modifications atomically, all stamped with one LSN.
/// of modifications atomically.
///
/// To ingest a WAL record, call begin_modification(lsn) to get a
/// To ingest a WAL record, call begin_modification() to get a
/// DatadirModification object. Use the functions in the object to
/// modify the repository state, updating all the pages and metadata
/// that the WAL record affects. When you're done, call commit() to
/// commit the changes.
/// that the WAL record affects. When you're done, call commit(lsn) to
/// commit the changes. All the changes will be stamped with the specified LSN.
///
/// Note that any pending modifications you make through the
/// modification object won't be visible to calls to the 'get' and list
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification<R> {
pub fn begin_modification(&self) -> DatadirModification<R> {
DatadirModification {
tline: self,
lsn,
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
writer: self.tline.writer(),
}
}
@@ -533,16 +531,12 @@ pub struct DatadirModification<'a, R: Repository> {
/// in the state in 'tline' yet.
pub tline: &'a DatadirTimeline<R>,
pub lsn: Lsn,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_updates: HashMap<Key, Value>,
pending_deletions: Vec<Range<Key>>,
pending_nblocks: isize,
writer: Box<dyn TimelineWriter<'a> + 'a>,
}
impl<'a, R: Repository> DatadirModification<'a, R> {
@@ -907,19 +901,22 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
///
/// Finish this atomic update, writing all the updated keys to the
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self) -> Result<()> {
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
let writer = self.tline.tline.writer();
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
self.writer.put(key, self.lsn, value)?;
writer.put(key, lsn, value)?;
}
for key_range in self.pending_deletions.drain(..) {
self.writer.delete(key_range, self.lsn)?;
writer.delete(key_range, lsn)?;
}
self.writer.finish_write(self.lsn);
writer.finish_write(lsn);
if pending_nblocks != 0 {
self.tline.current_logical_size.fetch_add(
@@ -1347,9 +1344,9 @@ pub fn create_test_timeline<R: Repository>(
) -> Result<Arc<crate::DatadirTimeline<R>>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = DatadirTimeline::new(tline, 256 * 1024);
let mut m = tline.begin_modification(Lsn(8));
let mut m = tline.begin_modification();
m.init_empty()?;
m.commit()?;
m.commit(Lsn(8))?;
drop(m);
Ok(Arc::new(tline))
}

View File

@@ -261,7 +261,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit(lsn)?;
Ok(())
}
@@ -1069,11 +1069,10 @@ mod tests {
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
let mut m = tline.begin_modification(Lsn(0x10));
let mut m = tline.begin_modification();
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
m.commit()?;
drop(m);
m.commit(Lsn(0x10))?;
let walingest = WalIngest::new(tline, Lsn(0x10))?;
Ok(walingest)
@@ -1085,23 +1084,19 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
walingest.put_rel_creation(&mut m, TESTREL_A)?;
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit()?;
drop(m);
let mut m = tline.begin_modification(Lsn(0x30));
m.commit(Lsn(0x20))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?;
m.commit()?;
drop(m);
let mut m = tline.begin_modification(Lsn(0x40));
m.commit(Lsn(0x30))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?;
m.commit()?;
drop(m);
let mut m = tline.begin_modification(Lsn(0x50));
m.commit(Lsn(0x40))?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
m.commit()?;
drop(m);
m.commit(Lsn(0x50))?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1147,10 +1142,9 @@ mod tests {
);
// Truncate last block
let mut m = tline.begin_modification(Lsn(0x60));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
m.commit()?;
drop(m);
m.commit(Lsn(0x60))?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1172,17 +1166,15 @@ mod tests {
);
// Truncate to zero length
let mut m = tline.begin_modification(Lsn(0x68));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
m.commit()?;
drop(m);
m.commit(Lsn(0x68))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
// Extend from 0 to 2 blocks, leaving a gap
let mut m = tline.begin_modification(Lsn(0x70));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
m.commit()?;
drop(m);
m.commit(Lsn(0x70))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
assert_eq!(
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
@@ -1194,10 +1186,9 @@ mod tests {
);
// Extend a lot more, leaving a big gap that spans across segments
let mut m = tline.begin_modification(Lsn(0x80));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
m.commit()?;
drop(m);
m.commit(Lsn(0x80))?;
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
for blk in 2..1500 {
assert_eq!(
@@ -1221,20 +1212,18 @@ mod tests {
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
m.commit()?;
drop(m);
m.commit(Lsn(0x20))?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
let mut m = tline.begin_modification();
walingest.put_rel_drop(&mut m, TESTREL_A)?;
m.commit()?;
drop(m);
m.commit(Lsn(0x30))?;
// Check that rel is not visible anymore
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
@@ -1243,10 +1232,9 @@ mod tests {
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
// Re-create it
let mut m = tline.begin_modification(Lsn(0x40));
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?;
m.commit()?;
drop(m);
m.commit(Lsn(0x40))?;
// Check that rel exists and size is correct
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
@@ -1266,13 +1254,12 @@ mod tests {
// Create a 20 MB relation (the size is arbitrary)
let relsize = 20 * 1024 * 1024 / 8192;
let mut m = tline.begin_modification(Lsn(0x20));
let mut m = tline.begin_modification();
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit()?;
drop(m);
m.commit(Lsn(0x20))?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
@@ -1293,10 +1280,9 @@ mod tests {
// Truncate relation so that second segment was dropped
// - only leave one page
let mut m = tline.begin_modification(Lsn(0x60));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
m.commit()?;
drop(m);
m.commit(Lsn(0x60))?;
// Check reported size and contents after truncation
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
@@ -1324,13 +1310,12 @@ mod tests {
// Extend relation again.
// Add enough blocks to create second segment
let lsn = Lsn(0x80);
let mut m = tline.begin_modification(lsn);
let mut m = tline.begin_modification();
for blkno in 0..relsize {
let data = format!("foo blk {} at {}", blkno, lsn);
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
}
m.commit()?;
drop(m);
m.commit(lsn)?;
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
@@ -1358,11 +1343,10 @@ mod tests {
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?;
m.commit()?;
drop(m);
m.commit(Lsn(lsn))?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1374,10 +1358,9 @@ mod tests {
// Truncate one block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
m.commit()?;
drop(m);
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
@@ -1386,10 +1369,9 @@ mod tests {
// Truncate another block
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
m.commit()?;
drop(m);
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
@@ -1401,10 +1383,9 @@ mod tests {
let mut size: i32 = 3000;
while size >= 0 {
lsn += 0x10;
let mut m = tline.begin_modification(Lsn(lsn));
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
m.commit()?;
drop(m);
m.commit(Lsn(lsn))?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
size as BlockNumber

View File

@@ -155,7 +155,7 @@ pub async fn handle_walreceiver_connection(
// timer = std::time::Instant::now();
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(last_rec_lsn);
let mut modification = timeline.begin_modification();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
@@ -165,7 +165,6 @@ pub async fn handle_walreceiver_connection(
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
modification.lsn = lsn;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
fail_point!("walreceiver-after-ingest");

View File

@@ -103,10 +103,10 @@ def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare
@pytest.mark.parametrize("duration", [10, 30])
def test_compare_pg_stats_wo_with_simple_write(neon_with_baseline: PgCompare,
def test_compare_pg_stats_wo_with_simple_write(neon_compare: PgCompare,
duration: int,
pg_stats_wo: List[PgStatTable]):
env = neon_with_baseline
env = neon_compare
with env.pg.connect().cursor() as cur:
cur.execute(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"