This commit is contained in:
Thang Pham
2022-06-29 15:27:09 -04:00
parent 42249823aa
commit 0e2462e5f2
4 changed files with 16 additions and 34 deletions

View File

@@ -63,7 +63,6 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
// We're done importing all the data files.
modification.commit(lsn)?;
drop(modification);
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;

View File

@@ -660,16 +660,7 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let blob_ref = BlobRef::new(off, val.will_init());
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;
Ok(())
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
}
pub fn put_value_bytes(

View File

@@ -87,6 +87,9 @@ impl<R: Repository> DatadirTimeline<R> {
/// that the WAL record affects. When you're done, call commit(lsn) to
/// commit the changes. All the changes will be stamped with the specified LSN.
///
/// Calling commit(lsn) will flush all the changes and reset the state,
/// so the `DatadirModification` struct can be reused to perform the next modification.
///
/// Note that any pending modifications you make through the
/// modification object won't be visible to calls to the 'get' and list
/// functions of the timeline until you finish! And if you update the
@@ -1347,7 +1350,6 @@ pub fn create_test_timeline<R: Repository>(
let mut m = tline.begin_modification();
m.init_empty()?;
m.commit(Lsn(8))?;
drop(m);
Ok(Arc::new(tline))
}

View File

@@ -129,6 +129,9 @@ pub async fn handle_walreceiver_connection(
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some(replication_message) = {
select! {
_ = cancellation.changed() => {
@@ -151,33 +154,20 @@ pub async fn handle_walreceiver_connection(
waldecoder.feed_bytes(data);
// let mut n_records = 0;
// timer = std::time::Instant::now();
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// let _enter = info_span!("processing record", lsn = %lsn).entered();
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
ensure!(lsn.is_aligned());
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
fail_point!("walreceiver-after-ingest");
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
// n_records += 1;
}
last_rec_lsn = lsn;
}
// info!(
// "Processing {n_records} records took {}us",
// timer.elapsed().as_micros()
// );
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {endlsn}");