mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
Move relation size cache from WalIngest to DatadirTimeline (#2094)
* Move relation sie cache to layered timeline * Fix obtaining current LSN for relation size cache * Resolve merge conflicts * Resolve merge conflicts * Reestore 'lsn' field in DatadirModification * adjust DatadirModification lsn in ingest_record * Fix formatting * Pass lsn to get_relsize * Fix merge conflict * Update pageserver/src/pgdatadir_mapping.rs Co-authored-by: Heikki Linnakangas <heikki@zenith.tech> * Update pageserver/src/pgdatadir_mapping.rs Co-authored-by: Heikki Linnakangas <heikki@zenith.tech> Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
This commit is contained in:
committed by
GitHub
parent
4cb1074fe5
commit
5133db44e1
@@ -37,7 +37,7 @@ pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
|
||||
|
||||
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
|
||||
// Then fishing out pg_control would be unnecessary
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
// Import all but pg_wal
|
||||
@@ -56,12 +56,12 @@ pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
|
||||
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
modification.flush(lsn)?;
|
||||
modification.flush()?;
|
||||
}
|
||||
}
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit(lsn)?;
|
||||
modification.commit()?;
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -267,7 +267,7 @@ fn import_wal<T: DatadirTimeline>(
|
||||
waldecoder.feed_bytes(&buf);
|
||||
|
||||
let mut nrecords = 0;
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut modification = tline.begin_modification(endpoint);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
@@ -301,7 +301,7 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
|
||||
base_lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
info!("importing base at {}", base_lsn);
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut modification = tline.begin_modification(base_lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
let mut pg_control: Option<ControlFileData> = None;
|
||||
@@ -319,7 +319,7 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
}
|
||||
modification.flush(base_lsn)?;
|
||||
modification.flush()?;
|
||||
}
|
||||
tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
@@ -333,7 +333,7 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit(base_lsn)?;
|
||||
modification.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -385,7 +385,7 @@ pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
|
||||
|
||||
waldecoder.feed_bytes(&bytes[offset..]);
|
||||
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut modification = tline.begin_modification(end_lsn);
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
|
||||
@@ -8,7 +8,7 @@ use lazy_static::lazy_static;
|
||||
use tracing::*;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{hash_map::Entry, HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::Write;
|
||||
@@ -38,7 +38,9 @@ use crate::layered_repository::{
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::pgdatadir_mapping::BlockNumber;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::reltag::RelTag;
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::DatadirTimeline;
|
||||
|
||||
@@ -295,6 +297,9 @@ pub struct LayeredTimeline {
|
||||
/// or None if WAL receiver has not received anything for this timeline
|
||||
/// yet.
|
||||
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
|
||||
|
||||
/// Relation size cache
|
||||
rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -306,7 +311,42 @@ pub struct WalReceiverInfo {
|
||||
/// Inherit all the functions from DatadirTimeline, to provide the
|
||||
/// functionality to store PostgreSQL relations, SLRUs, etc. in a
|
||||
/// LayeredTimeline.
|
||||
impl DatadirTimeline for LayeredTimeline {}
|
||||
impl DatadirTimeline for LayeredTimeline {
|
||||
fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
return Some(*nblocks);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
match rel_size_cache.entry(tag) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
let cached_lsn = entry.get_mut();
|
||||
if lsn >= cached_lsn.0 {
|
||||
*cached_lsn = (lsn, nblocks);
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.insert(tag, (lsn, nblocks));
|
||||
}
|
||||
|
||||
fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.remove(tag);
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Information about how much history needs to be retained, needed by
|
||||
@@ -377,8 +417,6 @@ impl Timeline for LayeredTimeline {
|
||||
|
||||
/// Look up the value with the given a key
|
||||
fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes> {
|
||||
debug_assert!(lsn <= self.get_last_record_lsn());
|
||||
|
||||
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
|
||||
@@ -618,6 +656,7 @@ impl LayeredTimeline {
|
||||
repartition_threshold: 0,
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(HashMap::new()),
|
||||
};
|
||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||
result
|
||||
|
||||
@@ -56,13 +56,16 @@ pub trait DatadirTimeline: Timeline {
|
||||
/// This provides a transaction-like interface to perform a bunch
|
||||
/// of modifications atomically.
|
||||
///
|
||||
/// To ingest a WAL record, call begin_modification() to get a
|
||||
/// To ingest a WAL record, call begin_modification(lsn) to get a
|
||||
/// DatadirModification object. Use the functions in the object to
|
||||
/// modify the repository state, updating all the pages and metadata
|
||||
/// that the WAL record affects. When you're done, call commit(lsn) to
|
||||
/// commit the changes. All the changes will be stamped with the specified LSN.
|
||||
/// that the WAL record affects. When you're done, call commit() to
|
||||
/// commit the changes.
|
||||
///
|
||||
/// Calling commit(lsn) will flush all the changes and reset the state,
|
||||
/// Lsn stored in modification is advanced by `ingest_record` and
|
||||
/// is used by `commit()` to update `last_record_lsn`.
|
||||
///
|
||||
/// Calling commit() will flush all the changes and reset the state,
|
||||
/// so the `DatadirModification` struct can be reused to perform the next modification.
|
||||
///
|
||||
/// Note that any pending modifications you make through the
|
||||
@@ -70,7 +73,7 @@ pub trait DatadirTimeline: Timeline {
|
||||
/// functions of the timeline until you finish! And if you update the
|
||||
/// same page twice, the last update wins.
|
||||
///
|
||||
fn begin_modification(&self) -> DatadirModification<Self>
|
||||
fn begin_modification(&self, lsn: Lsn) -> DatadirModification<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
@@ -79,6 +82,7 @@ pub trait DatadirTimeline: Timeline {
|
||||
pending_updates: HashMap::new(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
lsn,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,6 +124,10 @@ pub trait DatadirTimeline: Timeline {
|
||||
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
|
||||
return Ok(nblocks);
|
||||
}
|
||||
|
||||
if (tag.forknum == pg_constants::FSM_FORKNUM
|
||||
|| tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM)
|
||||
&& !self.get_rel_exists(tag, lsn)?
|
||||
@@ -133,13 +141,21 @@ pub trait DatadirTimeline: Timeline {
|
||||
|
||||
let key = rel_size_to_key(tag);
|
||||
let mut buf = self.get(key, lsn)?;
|
||||
Ok(buf.get_u32_le())
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
// Update relation size cache
|
||||
self.update_cached_rel_size(tag, lsn, nblocks);
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
/// Does relation exist?
|
||||
fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
// first try to lookup relation in cache
|
||||
if let Some(_nblocks) = self.get_cached_rel_size(&tag, lsn) {
|
||||
return Ok(true);
|
||||
}
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
let buf = self.get(key, lsn)?;
|
||||
@@ -445,6 +461,18 @@ pub trait DatadirTimeline: Timeline {
|
||||
|
||||
Ok(result.to_keyspace())
|
||||
}
|
||||
|
||||
/// Get cached size of relation if it not updated after specified LSN
|
||||
fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber>;
|
||||
|
||||
/// Update cached relation size if there is no more recent update
|
||||
fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber);
|
||||
|
||||
/// Store cached relation size
|
||||
fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber);
|
||||
|
||||
/// Remove cached relation size
|
||||
fn remove_cached_rel_size(&self, tag: &RelTag);
|
||||
}
|
||||
|
||||
/// DatadirModification represents an operation to ingest an atomic set of
|
||||
@@ -457,6 +485,9 @@ pub struct DatadirModification<'a, T: DatadirTimeline> {
|
||||
/// in the state in 'tline' yet.
|
||||
pub tline: &'a T,
|
||||
|
||||
/// Lsn assigned by begin_modification
|
||||
pub lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
// underlying key-value store by the 'finish' function.
|
||||
@@ -666,9 +697,11 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
|
||||
self.pending_nblocks += nblocks as isize;
|
||||
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
// Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
|
||||
// caller.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -684,6 +717,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
// Update logical database size.
|
||||
self.pending_nblocks -= old_size as isize - nblocks as isize;
|
||||
Ok(())
|
||||
@@ -703,6 +739,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
self.pending_nblocks += nblocks as isize - old_size as isize;
|
||||
}
|
||||
Ok(())
|
||||
@@ -728,6 +767,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as isize;
|
||||
|
||||
// Remove enty from relation size cache
|
||||
self.tline.remove_cached_rel_size(&rel);
|
||||
|
||||
// Delete size entry, as well as all blocks
|
||||
self.delete(rel_key_range(rel));
|
||||
|
||||
@@ -842,7 +884,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self, lsn: Lsn) -> Result<()> {
|
||||
pub fn flush(&mut self) -> Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -856,7 +898,7 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
let mut result: Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, lsn, value);
|
||||
result = writer.put(key, self.lsn, value);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
@@ -877,9 +919,9 @@ impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
|
||||
pub fn commit(&mut self) -> Result<()> {
|
||||
let writer = self.tline.writer();
|
||||
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
@@ -1324,9 +1366,9 @@ pub fn create_test_timeline<R: Repository>(
|
||||
timeline_id: utils::zid::ZTimelineId,
|
||||
) -> Result<std::sync::Arc<R::Timeline>> {
|
||||
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit(Lsn(8))?;
|
||||
m.commit()?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
|
||||
@@ -30,8 +30,6 @@ use anyhow::Result;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use tracing::*;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::reltag::{RelTag, SlruKind};
|
||||
use crate::walrecord::*;
|
||||
@@ -48,8 +46,6 @@ pub struct WalIngest<'a, T: DatadirTimeline> {
|
||||
|
||||
checkpoint: CheckPoint,
|
||||
checkpoint_modified: bool,
|
||||
|
||||
relsize_cache: HashMap<RelTag, BlockNumber>,
|
||||
}
|
||||
|
||||
impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
@@ -64,13 +60,13 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
timeline,
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
relsize_cache: HashMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline.
|
||||
///
|
||||
/// This function updates `lsn` field of `DatadirModification`
|
||||
///
|
||||
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
|
||||
/// relations/pages that the record affects.
|
||||
@@ -82,6 +78,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
modification: &mut DatadirModification<T>,
|
||||
decoded: &mut DecodedWALRecord,
|
||||
) -> Result<()> {
|
||||
modification.lsn = lsn;
|
||||
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
|
||||
|
||||
let mut buf = decoded.record.clone();
|
||||
@@ -260,7 +257,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
modification.commit(lsn)?;
|
||||
modification.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -408,7 +405,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let vm_size = self.get_relsize(vm_rel)?;
|
||||
let vm_size = self.get_relsize(vm_rel, modification.lsn)?;
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
@@ -880,7 +877,6 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
modification: &mut DatadirModification<T>,
|
||||
rel: RelTag,
|
||||
) -> Result<()> {
|
||||
self.relsize_cache.insert(rel, 0);
|
||||
modification.put_rel_creation(rel, 0)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -916,7 +912,6 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
nblocks: BlockNumber,
|
||||
) -> Result<()> {
|
||||
modification.put_rel_truncation(rel, nblocks)?;
|
||||
self.relsize_cache.insert(rel, nblocks);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -926,23 +921,16 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
rel: RelTag,
|
||||
) -> Result<()> {
|
||||
modification.put_rel_drop(rel)?;
|
||||
self.relsize_cache.remove(&rel);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_relsize(&mut self, rel: RelTag) -> Result<BlockNumber> {
|
||||
if let Some(nblocks) = self.relsize_cache.get(&rel) {
|
||||
Ok(*nblocks)
|
||||
fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> Result<BlockNumber> {
|
||||
let nblocks = if !self.timeline.get_rel_exists(rel, lsn)? {
|
||||
0
|
||||
} else {
|
||||
let last_lsn = self.timeline.get_last_record_lsn();
|
||||
let nblocks = if !self.timeline.get_rel_exists(rel, last_lsn)? {
|
||||
0
|
||||
} else {
|
||||
self.timeline.get_rel_size(rel, last_lsn)?
|
||||
};
|
||||
self.relsize_cache.insert(rel, nblocks);
|
||||
Ok(nblocks)
|
||||
}
|
||||
self.timeline.get_rel_size(rel, lsn)?
|
||||
};
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
fn handle_rel_extend(
|
||||
@@ -952,22 +940,16 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
blknum: BlockNumber,
|
||||
) -> Result<()> {
|
||||
let new_nblocks = blknum + 1;
|
||||
let old_nblocks = if let Some(nblocks) = self.relsize_cache.get(&rel) {
|
||||
*nblocks
|
||||
// Check if the relation exists. We implicitly create relations on first
|
||||
// record.
|
||||
// TODO: would be nice if to be more explicit about it
|
||||
let last_lsn = modification.lsn;
|
||||
let old_nblocks = if !self.timeline.get_rel_exists(rel, last_lsn)? {
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
modification.put_rel_creation(rel, 0)?;
|
||||
0
|
||||
} else {
|
||||
// Check if the relation exists. We implicitly create relations on first
|
||||
// record.
|
||||
// TODO: would be nice if to be more explicit about it
|
||||
let last_lsn = self.timeline.get_last_record_lsn();
|
||||
let nblocks = if !self.timeline.get_rel_exists(rel, last_lsn)? {
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
modification.put_rel_creation(rel, 0)?;
|
||||
0
|
||||
} else {
|
||||
self.timeline.get_rel_size(rel, last_lsn)?
|
||||
};
|
||||
self.relsize_cache.insert(rel, nblocks);
|
||||
nblocks
|
||||
self.timeline.get_rel_size(rel, last_lsn)?
|
||||
};
|
||||
|
||||
if new_nblocks > old_nblocks {
|
||||
@@ -978,7 +960,6 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
|
||||
for gap_blknum in old_nblocks..blknum {
|
||||
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
|
||||
}
|
||||
self.relsize_cache.insert(rel, new_nblocks);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1069,10 +1050,10 @@ mod tests {
|
||||
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
|
||||
|
||||
fn init_walingest_test<T: DatadirTimeline>(tline: &T) -> Result<WalIngest<T>> {
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
|
||||
m.commit(Lsn(0x10))?;
|
||||
m.commit()?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10))?;
|
||||
|
||||
Ok(walingest)
|
||||
@@ -1084,19 +1065,19 @@ mod tests {
|
||||
let tline = create_test_timeline(repo, TIMELINE_ID)?;
|
||||
let mut walingest = init_walingest_test(&*tline)?;
|
||||
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest.put_rel_creation(&mut m, TESTREL_A)?;
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
|
||||
m.commit(Lsn(0x20))?;
|
||||
let mut m = tline.begin_modification();
|
||||
m.commit()?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?;
|
||||
m.commit(Lsn(0x30))?;
|
||||
let mut m = tline.begin_modification();
|
||||
m.commit()?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?;
|
||||
m.commit(Lsn(0x40))?;
|
||||
let mut m = tline.begin_modification();
|
||||
m.commit()?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
|
||||
m.commit(Lsn(0x50))?;
|
||||
m.commit()?;
|
||||
|
||||
assert_current_logical_size(&*tline, Lsn(0x50));
|
||||
|
||||
@@ -1142,9 +1123,9 @@ mod tests {
|
||||
);
|
||||
|
||||
// Truncate last block
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x60));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
|
||||
m.commit(Lsn(0x60))?;
|
||||
m.commit()?;
|
||||
assert_current_logical_size(&*tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -1166,15 +1147,15 @@ mod tests {
|
||||
);
|
||||
|
||||
// Truncate to zero length
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x68));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
|
||||
m.commit(Lsn(0x68))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
|
||||
|
||||
// Extend from 0 to 2 blocks, leaving a gap
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x70));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
|
||||
m.commit(Lsn(0x70))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
|
||||
@@ -1186,9 +1167,9 @@ mod tests {
|
||||
);
|
||||
|
||||
// Extend a lot more, leaving a big gap that spans across segments
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x80));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
|
||||
m.commit(Lsn(0x80))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
|
||||
for blk in 2..1500 {
|
||||
assert_eq!(
|
||||
@@ -1212,18 +1193,18 @@ mod tests {
|
||||
let tline = create_test_timeline(repo, TIMELINE_ID)?;
|
||||
let mut walingest = init_walingest_test(&*tline)?;
|
||||
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
|
||||
m.commit(Lsn(0x20))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
|
||||
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A)?;
|
||||
m.commit(Lsn(0x30))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
|
||||
@@ -1232,9 +1213,9 @@ mod tests {
|
||||
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
|
||||
|
||||
// Re-create it
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?;
|
||||
m.commit(Lsn(0x40))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
|
||||
@@ -1254,12 +1235,12 @@ mod tests {
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
let relsize = 20 * 1024 * 1024 / 8192;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
for blkno in 0..relsize {
|
||||
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
|
||||
}
|
||||
m.commit(Lsn(0x20))?;
|
||||
m.commit()?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
|
||||
@@ -1280,9 +1261,9 @@ mod tests {
|
||||
|
||||
// Truncate relation so that second segment was dropped
|
||||
// - only leave one page
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x60));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
|
||||
m.commit(Lsn(0x60))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
|
||||
@@ -1310,12 +1291,12 @@ mod tests {
|
||||
// Extend relation again.
|
||||
// Add enough blocks to create second segment
|
||||
let lsn = Lsn(0x80);
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(lsn);
|
||||
for blkno in 0..relsize {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
|
||||
}
|
||||
m.commit(lsn)?;
|
||||
m.commit()?;
|
||||
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
|
||||
@@ -1343,10 +1324,10 @@ mod tests {
|
||||
let mut lsn = 0x10;
|
||||
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
@@ -1358,9 +1339,9 @@ mod tests {
|
||||
|
||||
// Truncate one block
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
|
||||
pg_constants::RELSEG_SIZE
|
||||
@@ -1369,9 +1350,9 @@ mod tests {
|
||||
|
||||
// Truncate another block
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
|
||||
pg_constants::RELSEG_SIZE - 1
|
||||
@@ -1383,9 +1364,9 @@ mod tests {
|
||||
let mut size: i32 = 3000;
|
||||
while size >= 0 {
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
|
||||
size as BlockNumber
|
||||
|
||||
@@ -154,7 +154,7 @@ pub async fn handle_walreceiver_connection(
|
||||
|
||||
{
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
let mut modification = timeline.begin_modification();
|
||||
let mut modification = timeline.begin_modification(endlsn);
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user