Make DatadirTimeline a trait, implemented by LayeredTimeline.

Previously DatadirTimeline was a separate struct, and there was a 1:1
relationship between each DatadirTimeline and LayeredTimeline. That was
a bit awkward; whenever you created a timeline, you also needed to create
the DatadirTimeline wrapper around it, and if you only had a reference
to the LayeredTimeline, you would need to look up the corresponding
DatadirTimeline struct through tenant_mgr::get_local_timeline_with_load().
There were a couple of calls like that from LayeredTimeline itself.

Refactor DatadirTimeline, so that it's a trait, and mark LayeredTimeline
as implementing that trait. That way, there's only one object,
LayeredTimeline, and you can call both Timeline and DatadirTimeline
functions on that. You can now also call DatadirTimeline functions from
LayeredTimeline itself.

I considered just moving all the functions from DatadirTimeline directly
to Timeline/LayeredTimeline, but I still like to have some separation.
Timeline provides a simple key-value API, and handles durably storing
key/value pairs, and branching. Whereas DatadirTimeline is stateless, and
provides an abstraction over the key-value store, to present an interface
with relations, databases, etc. Postgres concepts.

This simplified the logical size calculation fast-path for branch
creation, introduced in commit 28243d68e6. LayerTimeline can now
access the ancestor's logical size directly, so it doesn't need the
caller to pass it to it. I moved the fast-path to init_logical_size()
function itself. It now checks if the ancestor's last LSN is the same
as the branch point, i.e. if there haven't been any changes on the
ancestor after the branch, and copies the size from there. An
additional bonus is that the optimization will now work any time you
have a branch of another branch, with no changes from the ancestor,
not only at a create-branch command.
This commit is contained in:
Heikki Linnakangas
2022-07-27 10:26:21 +03:00
parent 5a4394a8df
commit d6f12cff8e
13 changed files with 361 additions and 412 deletions

View File

@@ -23,8 +23,7 @@ use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Timeline;
use crate::DatadirTimelineImpl;
use crate::DatadirTimeline;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use utils::lsn::Lsn;
@@ -32,12 +31,13 @@ use utils::lsn::Lsn;
/// This is short-living object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between various functions
/// used for constructing tarball.
pub struct Basebackup<'a, W>
pub struct Basebackup<'a, W, T>
where
W: Write,
T: DatadirTimeline,
{
ar: Builder<AbortableWrite<W>>,
timeline: &'a Arc<DatadirTimelineImpl>,
timeline: &'a Arc<T>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
@@ -52,17 +52,18 @@ where
// * When working without safekeepers. In this situation it is important to match the lsn
// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
// to start the replication.
impl<'a, W> Basebackup<'a, W>
impl<'a, W, T> Basebackup<'a, W, T>
where
W: Write,
T: DatadirTimeline,
{
pub fn new(
write: W,
timeline: &'a Arc<DatadirTimelineImpl>,
timeline: &'a Arc<T>,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a, W>> {
) -> Result<Basebackup<'a, W, T>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
@@ -79,13 +80,13 @@ where
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
info!("waiting for {}", req_lsn);
timeline.tline.wait_lsn(req_lsn)?;
timeline.wait_lsn(req_lsn)?;
// If the requested point is the end of the timeline, we can
// provide prev_lsn. (get_last_record_rlsn() might return it as
// zero, though, if no WAL has been generated on this timeline
// yet.)
let end_of_timeline = timeline.tline.get_last_record_rlsn();
let end_of_timeline = timeline.get_last_record_rlsn();
if req_lsn == end_of_timeline.last {
(end_of_timeline.prev, req_lsn)
} else {
@@ -93,7 +94,7 @@ where
}
} else {
// Backup was requested at end of the timeline.
let end_of_timeline = timeline.tline.get_last_record_rlsn();
let end_of_timeline = timeline.get_last_record_rlsn();
(end_of_timeline.prev, end_of_timeline.last)
};
@@ -371,7 +372,7 @@ where
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
if self.lsn == self.timeline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
@@ -402,9 +403,10 @@ where
}
}
impl<'a, W> Drop for Basebackup<'a, W>
impl<'a, W, T> Drop for Basebackup<'a, W, T>
where
W: Write,
T: DatadirTimeline,
{
/// If the basebackup was not finished, prevent the Archive::drop() from
/// writing the end-of-archive marker.

View File

@@ -13,8 +13,6 @@ use walkdir::WalkDir;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::relfile_utils::*;
@@ -30,9 +28,9 @@ use utils::lsn::Lsn;
/// This is currently only used to import a cluster freshly created by initdb.
/// The code that deals with the checkpoint would not work right if the
/// cluster was not shut down cleanly.
pub fn import_timeline_from_postgres_datadir<R: Repository>(
pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
path: &Path,
tline: &mut DatadirTimeline<R>,
tline: &T,
lsn: Lsn,
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
@@ -90,8 +88,8 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
fn import_rel<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
fn import_rel<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
path: &Path,
spcoid: Oid,
dboid: Oid,
@@ -170,8 +168,8 @@ fn import_rel<R: Repository, Reader: Read>(
/// Import an SLRU segment file
///
fn import_slru<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
fn import_slru<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
slru: SlruKind,
path: &Path,
mut reader: Reader,
@@ -226,9 +224,9 @@ fn import_slru<R: Repository, Reader: Read>(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal<R: Repository>(
fn import_wal<T: DatadirTimeline>(
walpath: &Path,
tline: &mut DatadirTimeline<R>,
tline: &T,
startpoint: Lsn,
endpoint: Lsn,
) -> Result<()> {
@@ -297,8 +295,8 @@ fn import_wal<R: Repository>(
Ok(())
}
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
tline: &T,
reader: Reader,
base_lsn: Lsn,
) -> Result<()> {
@@ -339,8 +337,8 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
Ok(())
}
pub fn import_wal_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
tline: &T,
reader: Reader,
start_lsn: Lsn,
end_lsn: Lsn,
@@ -420,8 +418,8 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
Ok(())
}
pub fn import_file<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
pub fn import_file<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
file_path: &Path,
reader: Reader,
len: usize,
@@ -540,7 +538,7 @@ pub fn import_file<R: Repository, Reader: Read>(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.tline.writer();
let writer = modification.tline.writer();
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

View File

@@ -32,7 +32,6 @@ use crate::storage_sync::index::RemoteIndex;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::repository::{GcResult, Repository, RepositoryTimeline, Timeline};
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
@@ -181,7 +180,6 @@ impl Repository for LayeredRepository {
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
None,
);
timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
@@ -246,20 +244,6 @@ impl Repository for LayeredRepository {
));
}
}
// Copy logical size from source timeline if we are branching on the last position.
let init_logical_size =
if let Ok(src_pgdir) = tenant_mgr::get_local_timeline_with_load(self.tenant_id, src) {
let logical_size = src_pgdir.get_current_logical_size();
// Check LSN after getting logical size to exclude race condition
// when ancestor timeline is concurrently updated
if src_timeline.get_last_record_lsn() == start_lsn {
Some(logical_size)
} else {
None
}
} else {
None
};
// Determine prev-LSN for the new timeline. We can only determine it if
// the timeline was branched at the current end of the source timeline.
@@ -290,14 +274,7 @@ impl Repository for LayeredRepository {
);
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenant_id))?;
timeline::save_metadata(self.conf, dst, self.tenant_id, &metadata, true)?;
timelines.insert(
dst,
LayeredTimelineEntry::Unloaded {
id: dst,
metadata,
init_logical_size,
},
);
timelines.insert(dst, LayeredTimelineEntry::Unloaded { id: dst, metadata });
info!("branched timeline {} from {} at {}", dst, src, start_lsn);
@@ -433,7 +410,7 @@ impl Repository for LayeredRepository {
// we need to get metadata of a timeline, another option is to pass it along with Downloaded status
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?;
// finally we make newly downloaded timeline visible to repository
entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, init_logical_size: None })
entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata })
},
};
Ok(())
@@ -551,18 +528,13 @@ impl LayeredRepository {
timelineid: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> anyhow::Result<Option<Arc<LayeredTimeline>>> {
let logical_size: Option<usize>;
match timelines.get(&timelineid) {
Some(entry) => match entry {
LayeredTimelineEntry::Loaded(local_timeline) => {
debug!("timeline {} found loaded into memory", &timelineid);
return Ok(Some(Arc::clone(local_timeline)));
}
LayeredTimelineEntry::Unloaded {
init_logical_size, ..
} => {
logical_size = *init_logical_size;
}
LayeredTimelineEntry::Unloaded { .. } => {}
},
None => {
debug!("timeline {} not found", &timelineid);
@@ -573,7 +545,7 @@ impl LayeredRepository {
"timeline {} found on a local disk, but not loaded into the memory, loading",
&timelineid
);
let timeline = self.load_local_timeline(timelineid, timelines, logical_size)?;
let timeline = self.load_local_timeline(timelineid, timelines)?;
let was_loaded = timelines.insert(
timelineid,
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
@@ -590,7 +562,6 @@ impl LayeredRepository {
&self,
timeline_id: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
init_logical_size: Option<usize>,
) -> anyhow::Result<Arc<LayeredTimeline>> {
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
@@ -617,7 +588,6 @@ impl LayeredRepository {
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
init_logical_size,
);
timeline
.load_layer_map(disk_consistent_lsn)

View File

@@ -14,7 +14,7 @@ use std::fs::{File, OpenOptions};
use std::io::Write;
use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::atomic::{self, AtomicBool};
use std::sync::atomic::{self, AtomicBool, AtomicIsize, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError};
use std::time::{Duration, SystemTime};
@@ -39,6 +39,7 @@ use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::tenant_config::TenantConfOpt;
use crate::DatadirTimeline;
use postgres_ffi::xlog_utils::to_pg_timestamp;
use utils::{
@@ -49,7 +50,6 @@ use utils::{
use crate::repository::{GcResult, RepositoryTimeline, Timeline, TimelineWriter};
use crate::repository::{Key, Value};
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::virtual_file::VirtualFile;
use crate::walreceiver::IS_WAL_RECEIVER;
@@ -122,7 +122,6 @@ pub enum LayeredTimelineEntry {
Unloaded {
id: ZTimelineId,
metadata: TimelineMetadata,
init_logical_size: Option<usize>,
},
}
@@ -269,11 +268,21 @@ pub struct LayeredTimeline {
// though lets keep them both for better error visibility.
pub initdb_lsn: Lsn,
// Initial logical size of timeline (if known).
// Logical size can be copied from ancestor timeline when new branch is create at last LSN
pub init_logical_size: Option<usize>,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: AtomicIsize,
}
/// Inherit all the functions from DatadirTimeline, to provide the
/// functionality to store PostgreSQL relations, SLRUs, etc. in a
/// LayeredTimeline.
impl DatadirTimeline for LayeredTimeline {}
///
/// Information about how much history needs to be retained, needed by
/// Garbage Collection.
@@ -472,7 +481,6 @@ impl LayeredTimeline {
tenant_id: ZTenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
init_logical_size: Option<usize>,
) -> LayeredTimeline {
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
@@ -508,7 +516,7 @@ impl LayeredTimeline {
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
LayeredTimeline {
let mut result = LayeredTimeline {
conf,
tenant_conf,
timeline_id,
@@ -551,8 +559,13 @@ impl LayeredTimeline {
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
init_logical_size,
}
current_logical_size: AtomicIsize::new(0),
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
}
///
@@ -634,6 +647,58 @@ impl LayeredTimeline {
Ok(())
}
/// (Re-)calculate the logical size of the database at the latest LSN.
///
/// This can be a slow operation.
pub fn init_logical_size(&self) -> Result<()> {
// Try a fast-path first:
// Copy logical size from ancestor timeline if there has been no changes on this
// branch, and no changes on the ancestor branch since the branch point.
if self.get_ancestor_lsn() == self.get_last_record_lsn() && self.ancestor_timeline.is_some()
{
let ancestor = self.get_ancestor_timeline()?;
let ancestor_logical_size = ancestor.get_current_logical_size();
// Check LSN after getting logical size to exclude race condition
// when ancestor timeline is concurrently updated.
//
// Logical size 0 means that it was not initialized, so don't believe that.
if ancestor_logical_size != 0 && ancestor.get_last_record_lsn() == self.ancestor_lsn {
self.current_logical_size
.store(ancestor_logical_size as isize, AtomicOrdering::SeqCst);
debug!(
"logical size copied from ancestor: {}",
ancestor_logical_size
);
return Ok(());
}
}
// Have to calculate it the hard way
let last_lsn = self.get_last_record_lsn();
let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?;
self.current_logical_size
.store(logical_size as isize, AtomicOrdering::SeqCst);
debug!("calculated logical size the hard way: {}", logical_size);
Ok(())
}
/// Retrieve current logical size of the timeline
///
/// NOTE: counted incrementally, includes ancestors,
pub fn get_current_logical_size(&self) -> usize {
let current_logical_size = self.current_logical_size.load(AtomicOrdering::Acquire);
match usize::try_from(current_logical_size) {
Ok(sz) => sz,
Err(_) => {
error!(
"current_logical_size is out of range: {}",
current_logical_size
);
0
}
}
}
///
/// Get a handle to a Layer for reading.
///
@@ -1003,18 +1068,16 @@ impl LayeredTimeline {
// files instead. This is possible as long as *all* the data imported into the
// repository have the same LSN.
let lsn_range = frozen_layer.get_lsn_range();
let layer_paths_to_upload = if lsn_range.start == self.initdb_lsn
&& lsn_range.end == Lsn(self.initdb_lsn.0 + 1)
{
let pgdir = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)?;
let (partitioning, _lsn) =
pgdir.repartition(self.initdb_lsn, self.get_compaction_target_size())?;
self.create_image_layers(&partitioning, self.initdb_lsn, true)?
} else {
// normal case, write out a L0 delta layer file.
let delta_path = self.create_delta_layer(&frozen_layer)?;
HashSet::from([delta_path])
};
let layer_paths_to_upload =
if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
let (partitioning, _lsn) =
self.repartition(self.initdb_lsn, self.get_compaction_target_size())?;
self.create_image_layers(&partitioning, self.initdb_lsn, true)?
} else {
// normal case, write out a L0 delta layer file.
let delta_path = self.create_delta_layer(&frozen_layer)?;
HashSet::from([delta_path])
};
fail_point!("flush-frozen-before-sync");
@@ -1186,38 +1249,56 @@ impl LayeredTimeline {
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
if let Ok(pgdir) =
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
{
// 2. Create new image layers for partitions that have been modified
// "enough".
let (partitioning, lsn) = pgdir.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
)?;
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
HashSet::from_iter(layer_paths_to_upload),
None,
);
}
// 3. Compact
let timer = self.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
} else {
debug!("Could not compact because no partitioning specified yet");
}
match self.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
) {
Ok((partitioning, lsn)) => {
// 2. Create new image layers for partitions that have been modified
// "enough".
let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?;
if !layer_paths_to_upload.is_empty()
&& self.upload_layers.load(atomic::Ordering::Relaxed)
{
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
HashSet::from_iter(layer_paths_to_upload),
None,
);
}
// 3. Compact
let timer = self.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline
// as a simple key-value store, ignoring the datadir layout. Log the
// error but continue.
error!("could not compact, repartitioning keyspace failed: {err:?}");
}
};
Ok(())
}
fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
let mut partitioning_guard = self.partitioning.lock().unwrap();
if partitioning_guard.1 == Lsn(0)
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
{
let keyspace = self.collect_keyspace(lsn)?;
let partitioning = keyspace.partition(partition_size);
*partitioning_guard = (partitioning, lsn);
return Ok((partitioning_guard.0.clone(), lsn));
}
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result<bool> {
let layers = self.layers.read().unwrap();
@@ -1626,19 +1707,21 @@ impl LayeredTimeline {
// Calculate pitr cutoff point.
// If we cannot determine a cutoff LSN, be conservative and don't GC anything.
let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn();
let mut pitr_cutoff_lsn: Lsn;
if pitr != Duration::ZERO {
// conservative, safe default is to remove nothing, when we have no
// commit timestamp data available
pitr_cutoff_lsn = *self.get_latest_gc_cutoff_lsn();
if let Ok(timeline) =
tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)
{
let now = SystemTime::now();
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
// If we don't have enough data to convert to LSN,
// play safe and don't remove any layers.
let now = SystemTime::now();
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
match timeline.find_lsn_for_timestamp(pitr_timestamp)? {
match self.find_lsn_for_timestamp(pitr_timestamp)? {
LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn,
LsnForTimestamp::Future(lsn) => {
debug!("future({})", lsn);
@@ -1653,9 +1736,10 @@ impl LayeredTimeline {
}
debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn)
}
} else if cfg!(test) {
// We don't have local timeline in mocked cargo tests.
// So, just ignore pitr_interval setting in this case.
} else {
// No time-based retention. (Some unit tests depend on garbage-collection
// working even when CLOG data is missing, so that find_lsn_for_timestamp()
// above doesn't work.)
pitr_cutoff_lsn = gc_info.horizon_cutoff;
}
gc_info.pitr_cutoff = pitr_cutoff_lsn;
@@ -1962,6 +2046,12 @@ impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> {
fn finish_write(&self, new_lsn: Lsn) {
self.tl.finish_write(new_lsn);
}
fn update_current_logical_size(&self, delta: isize) {
self.tl
.current_logical_size
.fetch_add(delta, AtomicOrdering::SeqCst);
}
}
/// Add a suffix to a layer file's name: .{num}.old

View File

@@ -63,8 +63,7 @@ pub enum CheckpointConfig {
}
pub type RepositoryImpl = LayeredRepository;
pub type DatadirTimelineImpl = DatadirTimeline<RepositoryImpl>;
pub type TimelineImpl = <LayeredRepository as repository::Repository>::Timeline;
pub fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint thread. This prevents new connections from

View File

@@ -30,7 +30,6 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
@@ -555,9 +554,6 @@ impl PageServerHandler {
info!("creating new timeline");
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.create_empty_timeline(timeline_id, base_lsn)?;
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -573,7 +569,7 @@ impl PageServerHandler {
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
import_basebackup_from_tar(&*timeline, reader, base_lsn)?;
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
@@ -583,7 +579,7 @@ impl PageServerHandler {
// Flush data to disk, then upload to s3
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
timeline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
@@ -605,10 +601,6 @@ impl PageServerHandler {
let timeline = repo.get_timeline_load(timeline_id)?;
ensure!(timeline.get_last_record_lsn() == start_lsn);
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
@@ -616,16 +608,16 @@ impl PageServerHandler {
info!("importing wal");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?;
import_wal_from_tar(&*timeline, reader, start_lsn, end_lsn)?;
// TODO Does it make sense to overshoot?
ensure!(datadir_timeline.tline.get_last_record_lsn() >= end_lsn);
ensure!(timeline.get_last_record_lsn() >= end_lsn);
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
timeline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
@@ -643,8 +635,8 @@ impl PageServerHandler {
/// In either case, if the page server hasn't received the WAL up to the
/// requested LSN yet, we will wait for it to arrive. The return value is
/// the LSN that should be used to look up the page versions.
fn wait_or_get_last_lsn<R: Repository>(
timeline: &DatadirTimeline<R>,
fn wait_or_get_last_lsn<T: DatadirTimeline>(
timeline: &T,
mut lsn: Lsn,
latest: bool,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
@@ -671,7 +663,7 @@ impl PageServerHandler {
if lsn <= last_record_lsn {
lsn = last_record_lsn;
} else {
timeline.tline.wait_lsn(lsn)?;
timeline.wait_lsn(lsn)?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
@@ -681,7 +673,7 @@ impl PageServerHandler {
if lsn == Lsn(0) {
bail!("invalid LSN(0) in request");
}
timeline.tline.wait_lsn(lsn)?;
timeline.wait_lsn(lsn)?;
}
ensure!(
lsn >= **latest_gc_cutoff_lsn,
@@ -691,14 +683,14 @@ impl PageServerHandler {
Ok(lsn)
}
fn handle_get_rel_exists_request<R: Repository>(
fn handle_get_rel_exists_request<T: DatadirTimeline>(
&self,
timeline: &DatadirTimeline<R>,
timeline: &T,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let exists = timeline.get_rel_exists(req.rel, lsn)?;
@@ -708,13 +700,13 @@ impl PageServerHandler {
}))
}
fn handle_get_nblocks_request<R: Repository>(
fn handle_get_nblocks_request<T: DatadirTimeline>(
&self,
timeline: &DatadirTimeline<R>,
timeline: &T,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let n_blocks = timeline.get_rel_size(req.rel, lsn)?;
@@ -724,13 +716,13 @@ impl PageServerHandler {
}))
}
fn handle_db_size_request<R: Repository>(
fn handle_db_size_request<T: DatadirTimeline>(
&self,
timeline: &DatadirTimeline<R>,
timeline: &T,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_db_size", dbnode = %req.dbnode, req_lsn = %req.lsn).entered();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
let total_blocks =
@@ -743,14 +735,14 @@ impl PageServerHandler {
}))
}
fn handle_get_page_at_lsn_request<R: Repository>(
fn handle_get_page_at_lsn_request<T: DatadirTimeline>(
&self,
timeline: &DatadirTimeline<R>,
timeline: &T,
req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn)
.entered();
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
/*
// Add a 1s delay to some requests. The delayed causes the requests to
@@ -783,7 +775,7 @@ impl PageServerHandler {
// check that the timeline exists
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
timeline
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
@@ -921,7 +913,7 @@ impl postgres_backend::Handler for PageServerHandler {
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Cannot load local timeline")?;
let end_of_timeline = timeline.tline.get_last_record_rlsn();
let end_of_timeline = timeline.get_last_record_rlsn();
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::text_col(b"prev_lsn"),
@@ -1139,7 +1131,7 @@ impl postgres_backend::Handler for PageServerHandler {
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
.context("Couldn't load timeline")?;
timeline.tline.compact()?;
timeline.compact()?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
@@ -1160,7 +1152,7 @@ impl postgres_backend::Handler for PageServerHandler {
.context("Cannot load local timeline")?;
// Checkpoint the timeline and also compact it (due to `CheckpointConfig::Forced`).
timeline.tline.checkpoint(CheckpointConfig::Forced)?;
timeline.checkpoint(CheckpointConfig::Forced)?;
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -6,10 +6,10 @@
//! walingest.rs handles a few things like implicit relation creation and extension.
//! Clarify that)
//!
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Timeline;
use crate::repository::*;
use crate::repository::{Repository, Timeline};
use crate::walrecord::ZenithWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
@@ -18,34 +18,12 @@ use postgres_ffi::{pg_constants, Oid, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::{Arc, Mutex, RwLockReadGuard};
use tracing::{debug, error, trace, warn};
use tracing::{debug, trace, warn};
use utils::{bin_ser::BeSer, lsn::Lsn};
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
pub type BlockNumber = u32;
pub struct DatadirTimeline<R>
where
R: Repository,
{
/// The underlying key-value store. Callers should not read or modify the
/// data in the underlying store directly. However, it is exposed to have
/// access to information like last-LSN, ancestor, and operations like
/// compaction.
pub tline: Arc<R::Timeline>,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: AtomicIsize,
}
#[derive(Debug)]
pub enum LsnForTimestamp {
Present(Lsn),
@@ -54,34 +32,24 @@ pub enum LsnForTimestamp {
NoData(Lsn),
}
impl<R: Repository> DatadirTimeline<R> {
pub fn new(tline: Arc<R::Timeline>, repartition_threshold: u64) -> Self {
DatadirTimeline {
tline,
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
current_logical_size: AtomicIsize::new(0),
repartition_threshold,
}
}
/// (Re-)calculate the logical size of the database at the latest LSN.
///
/// This can be a slow operation.
pub fn init_logical_size(&self) -> Result<()> {
let last_lsn = self.tline.get_last_record_lsn();
self.current_logical_size.store(
self.get_current_logical_size_non_incremental(last_lsn)? as isize,
Ordering::SeqCst,
);
Ok(())
}
/// Set timeline logical size.
pub fn set_logical_size(&self, size: usize) {
self.current_logical_size
.store(size as isize, Ordering::SeqCst);
}
///
/// This trait provides all the functionality to store PostgreSQL relations, SLRUs,
/// and other special kinds of files, in a versioned key-value store. The
/// Timeline trait provides the key-value store.
///
/// This is a trait, so that we can easily include all these functions in a Timeline
/// implementation. You're not expected to have different implementations of this trait,
/// rather, this provides an interface and implementation, over Timeline.
///
/// If you wanted to store other kinds of data in the Neon repository, e.g.
/// flat files or MySQL, you would create a new trait like this, with all the
/// functions that make sense for the kind of data you're storing. For flat files,
/// for example, you might have a function like "fn read(path, offset, size)".
/// We might also have that situation in the future, to support multiple PostgreSQL
/// versions, if there are big changes in how the data is organized in the data
/// directory, or if new special files are introduced.
///
pub trait DatadirTimeline: Timeline {
/// Start ingesting a WAL record, or other atomic modification of
/// the timeline.
///
@@ -102,7 +70,10 @@ impl<R: Repository> DatadirTimeline<R> {
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
pub fn begin_modification(&self) -> DatadirModification<R> {
fn begin_modification(&self) -> DatadirModification<Self>
where
Self: Sized,
{
DatadirModification {
tline: self,
pending_updates: HashMap::new(),
@@ -116,7 +87,7 @@ impl<R: Repository> DatadirTimeline<R> {
//------------------------------------------------------------------------------
/// Look up given page version.
pub fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
ensure!(tag.relnode != 0, "invalid relnode");
let nblocks = self.get_rel_size(tag, lsn)?;
@@ -129,11 +100,11 @@ impl<R: Repository> DatadirTimeline<R> {
}
let key = rel_block_to_key(tag, blknum);
self.tline.get(key, lsn)
self.get(key, lsn)
}
// Get size of a database in blocks
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
let mut total_blocks = 0;
let rels = self.list_rels(spcnode, dbnode, lsn)?;
@@ -146,7 +117,7 @@ impl<R: Repository> DatadirTimeline<R> {
}
/// Get size of a relation file
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
ensure!(tag.relnode != 0, "invalid relnode");
if (tag.forknum == pg_constants::FSM_FORKNUM
@@ -161,17 +132,17 @@ impl<R: Repository> DatadirTimeline<R> {
}
let key = rel_size_to_key(tag);
let mut buf = self.tline.get(key, lsn)?;
let mut buf = self.get(key, lsn)?;
Ok(buf.get_u32_le())
}
/// Does relation exist?
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
ensure!(tag.relnode != 0, "invalid relnode");
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = self.tline.get(key, lsn)?;
let buf = self.get(key, lsn)?;
let dir = RelDirectory::des(&buf)?;
let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
@@ -180,10 +151,10 @@ impl<R: Repository> DatadirTimeline<R> {
}
/// Get a list of all existing relations in given tablespace and database.
pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = self.tline.get(key, lsn)?;
let buf = self.get(key, lsn)?;
let dir = RelDirectory::des(&buf)?;
let rels: HashSet<RelTag> =
@@ -198,7 +169,7 @@ impl<R: Repository> DatadirTimeline<R> {
}
/// Look up given SLRU page version.
pub fn get_slru_page_at_lsn(
fn get_slru_page_at_lsn(
&self,
kind: SlruKind,
segno: u32,
@@ -206,26 +177,21 @@ impl<R: Repository> DatadirTimeline<R> {
lsn: Lsn,
) -> Result<Bytes> {
let key = slru_block_to_key(kind, segno, blknum);
self.tline.get(key, lsn)
self.get(key, lsn)
}
/// Get size of an SLRU segment
pub fn get_slru_segment_size(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
) -> Result<BlockNumber> {
fn get_slru_segment_size(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<BlockNumber> {
let key = slru_segment_size_to_key(kind, segno);
let mut buf = self.tline.get(key, lsn)?;
let mut buf = self.get(key, lsn)?;
Ok(buf.get_u32_le())
}
/// Get size of an SLRU segment
pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = self.tline.get(key, lsn)?;
let buf = self.get(key, lsn)?;
let dir = SlruSegmentDirectory::des(&buf)?;
let exists = dir.segments.get(&segno).is_some();
@@ -239,10 +205,10 @@ impl<R: Repository> DatadirTimeline<R> {
/// so it's not well defined which LSN you get if there were multiple commits
/// "in flight" at that point in time.
///
pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result<LsnForTimestamp> {
let gc_cutoff_lsn_guard = self.tline.get_latest_gc_cutoff_lsn();
fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result<LsnForTimestamp> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let min_lsn = *gc_cutoff_lsn_guard;
let max_lsn = self.tline.get_last_record_lsn();
let max_lsn = self.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the
// LSN divided by 8.
@@ -333,88 +299,51 @@ impl<R: Repository> DatadirTimeline<R> {
}
/// Get a list of SLRU segments
pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
// fetch directory entry
let key = slru_dir_to_key(kind);
let buf = self.tline.get(key, lsn)?;
let buf = self.get(key, lsn)?;
let dir = SlruSegmentDirectory::des(&buf)?;
Ok(dir.segments)
}
pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
let key = relmap_file_key(spcnode, dbnode);
let buf = self.tline.get(key, lsn)?;
let buf = self.get(key, lsn)?;
Ok(buf)
}
pub fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
// fetch directory entry
let buf = self.tline.get(DBDIR_KEY, lsn)?;
let buf = self.get(DBDIR_KEY, lsn)?;
let dir = DbDirectory::des(&buf)?;
Ok(dir.dbdirs)
}
pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
let key = twophase_file_key(xid);
let buf = self.tline.get(key, lsn)?;
let buf = self.get(key, lsn)?;
Ok(buf)
}
pub fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
// fetch directory entry
let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?;
let buf = self.get(TWOPHASEDIR_KEY, lsn)?;
let dir = TwoPhaseDirectory::des(&buf)?;
Ok(dir.xids)
}
pub fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
self.tline.get(CONTROLFILE_KEY, lsn)
fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
self.get(CONTROLFILE_KEY, lsn)
}
pub fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
self.tline.get(CHECKPOINT_KEY, lsn)
}
/// Get the LSN of the last ingested WAL record.
///
/// This is just a convenience wrapper that calls through to the underlying
/// repository.
pub fn get_last_record_lsn(&self) -> Lsn {
self.tline.get_last_record_lsn()
}
/// Check that it is valid to request operations with that lsn.
///
/// This is just a convenience wrapper that calls through to the underlying
/// repository.
pub fn check_lsn_is_in_scope(
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
) -> Result<()> {
self.tline.check_lsn_is_in_scope(lsn, latest_gc_cutoff_lsn)
}
/// Retrieve current logical size of the timeline
///
/// NOTE: counted incrementally, includes ancestors,
pub fn get_current_logical_size(&self) -> usize {
let current_logical_size = self.current_logical_size.load(Ordering::Acquire);
match usize::try_from(current_logical_size) {
Ok(sz) => sz,
Err(_) => {
error!(
"current_logical_size is out of range: {}",
current_logical_size
);
0
}
}
fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
self.get(CHECKPOINT_KEY, lsn)
}
/// Does the same as get_current_logical_size but counted on demand.
@@ -422,16 +351,16 @@ impl<R: Repository> DatadirTimeline<R> {
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
// Fetch list of database dirs and iterate them
let buf = self.tline.get(DBDIR_KEY, lsn)?;
let buf = self.get(DBDIR_KEY, lsn)?;
let dbdir = DbDirectory::des(&buf)?;
let mut total_size: usize = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self.list_rels(*spcnode, *dbnode, lsn)? {
let relsize_key = rel_size_to_key(rel);
let mut buf = self.tline.get(relsize_key, lsn)?;
let mut buf = self.get(relsize_key, lsn)?;
let relsize = buf.get_u32_le();
total_size += relsize as usize;
@@ -452,7 +381,7 @@ impl<R: Repository> DatadirTimeline<R> {
result.add_key(DBDIR_KEY);
// Fetch list of database dirs and iterate them
let buf = self.tline.get(DBDIR_KEY, lsn)?;
let buf = self.get(DBDIR_KEY, lsn)?;
let dbdir = DbDirectory::des(&buf)?;
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
@@ -469,7 +398,7 @@ impl<R: Repository> DatadirTimeline<R> {
rels.sort_unstable();
for rel in rels {
let relsize_key = rel_size_to_key(rel);
let mut buf = self.tline.get(relsize_key, lsn)?;
let mut buf = self.get(relsize_key, lsn)?;
let relsize = buf.get_u32_le();
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
@@ -485,13 +414,13 @@ impl<R: Repository> DatadirTimeline<R> {
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.tline.get(slrudir_key, lsn)?;
let buf = self.get(slrudir_key, lsn)?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.tline.get(segsize_key, lsn)?;
let mut buf = self.get(segsize_key, lsn)?;
let segsize = buf.get_u32_le();
result.add_range(
@@ -503,7 +432,7 @@ impl<R: Repository> DatadirTimeline<R> {
// Then pg_twophase
result.add_key(TWOPHASEDIR_KEY);
let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?;
let buf = self.get(TWOPHASEDIR_KEY, lsn)?;
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
xids.sort_unstable();
@@ -516,30 +445,17 @@ impl<R: Repository> DatadirTimeline<R> {
Ok(result.to_keyspace())
}
pub fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> {
let mut partitioning_guard = self.partitioning.lock().unwrap();
if partitioning_guard.1 == Lsn(0)
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
{
let keyspace = self.collect_keyspace(lsn)?;
let partitioning = keyspace.partition(partition_size);
*partitioning_guard = (partitioning, lsn);
return Ok((partitioning_guard.0.clone(), lsn));
}
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
}
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
/// by a one WAL record appear atomic.
pub struct DatadirModification<'a, R: Repository> {
pub struct DatadirModification<'a, T: DatadirTimeline> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
/// in the state in 'tline' yet.
pub tline: &'a DatadirTimeline<R>,
pub tline: &'a T,
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
@@ -549,7 +465,7 @@ pub struct DatadirModification<'a, R: Repository> {
pending_nblocks: isize,
}
impl<'a, R: Repository> DatadirModification<'a, R> {
impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -934,7 +850,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
return Ok(());
}
let writer = self.tline.tline.writer();
let writer = self.tline.writer();
// Flush relation and SLRU data blocks, keep metadata.
let mut result: Result<()> = Ok(());
@@ -949,10 +865,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
result?;
if pending_nblocks != 0 {
self.tline.current_logical_size.fetch_add(
pending_nblocks * pg_constants::BLCKSZ as isize,
Ordering::SeqCst,
);
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
self.pending_nblocks = 0;
}
@@ -965,7 +878,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
let writer = self.tline.tline.writer();
let writer = self.tline.writer();
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
@@ -980,10 +893,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
writer.finish_write(lsn);
if pending_nblocks != 0 {
self.tline.current_logical_size.fetch_add(
pending_nblocks * pg_constants::BLCKSZ as isize,
Ordering::SeqCst,
);
writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize);
}
Ok(())
@@ -1010,7 +920,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
}
} else {
let last_lsn = self.tline.get_last_record_lsn();
self.tline.tline.get(key, last_lsn)
self.tline.get(key, last_lsn)
}
}
@@ -1412,13 +1322,12 @@ fn is_slru_block_key(key: Key) -> bool {
pub fn create_test_timeline<R: Repository>(
repo: R,
timeline_id: utils::zid::ZTimelineId,
) -> Result<Arc<crate::DatadirTimeline<R>>> {
) -> Result<std::sync::Arc<R::Timeline>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = DatadirTimeline::new(tline, 256 * 1024);
let mut m = tline.begin_modification();
m.init_empty()?;
m.commit(Lsn(8))?;
Ok(Arc::new(tline))
Ok(tline)
}
#[allow(clippy::bool_assert_comparison)]
@@ -1491,7 +1400,7 @@ mod tests {
.contains(&TESTREL_A));
// Run checkpoint and garbage collection and check that it's still not visible
newtline.tline.checkpoint(CheckpointConfig::Forced)?;
newtline.checkpoint(CheckpointConfig::Forced)?;
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
assert!(!newtline

View File

@@ -185,7 +185,7 @@ impl Value {
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
type Timeline: Timeline;
type Timeline: crate::DatadirTimeline;
/// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
@@ -405,6 +405,8 @@ pub trait TimelineWriter<'a> {
/// the 'lsn' or anything older. The previous last record LSN is stored alongside
/// the latest and can be read.
fn finish_write(&self, lsn: Lsn);
fn update_current_logical_size(&self, delta: isize);
}
#[cfg(test)]

View File

@@ -3,7 +3,6 @@
use crate::config::PageServerConf;
use crate::layered_repository::{load_metadata, LayeredRepository};
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::Repository;
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
@@ -12,7 +11,7 @@ use crate::thread_mgr::ThreadKind;
use crate::timelines::CreateRepo;
use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{DatadirTimelineImpl, RepositoryImpl};
use crate::{RepositoryImpl, TimelineImpl};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
@@ -101,7 +100,7 @@ struct Tenant {
///
/// Local timelines have more metadata that's loaded into memory,
/// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`].
local_timelines: HashMap<ZTimelineId, Arc<DatadirTimelineImpl>>,
local_timelines: HashMap<ZTimelineId, Arc<<RepositoryImpl as Repository>::Timeline>>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -178,7 +177,7 @@ pub enum LocalTimelineUpdate {
},
Attach {
id: ZTenantTimelineId,
datadir: Arc<DatadirTimelineImpl>,
datadir: Arc<<RepositoryImpl as Repository>::Timeline>,
},
}
@@ -382,7 +381,7 @@ pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<Rep
pub fn get_local_timeline_with_load(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<DatadirTimelineImpl>> {
) -> anyhow::Result<Arc<TimelineImpl>> {
let mut m = tenants_state::write_tenants();
let tenant = m
.get_mut(&tenant_id)
@@ -489,27 +488,18 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
fn load_local_timeline(
repo: &RepositoryImpl,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<DatadirTimeline<LayeredRepository>>> {
) -> anyhow::Result<Arc<TimelineImpl>> {
let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| {
format!("Inmem timeline {timeline_id} not found in tenant's repository")
})?;
let repartition_distance = repo.get_checkpoint_distance() / 10;
let init_logical_size = inmem_timeline.init_logical_size;
let page_tline = Arc::new(DatadirTimelineImpl::new(
inmem_timeline,
repartition_distance,
));
if let Some(logical_size) = init_logical_size {
page_tline.set_logical_size(logical_size);
} else {
page_tline.init_logical_size()?;
}
inmem_timeline.init_logical_size()?;
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach {
id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
datadir: Arc::clone(&page_tline),
datadir: Arc::clone(&inmem_timeline),
});
Ok(page_tline)
Ok(inmem_timeline)
}
#[serde_as]

View File

@@ -26,7 +26,7 @@ use crate::{
repository::{LocalTimelineState, Repository},
storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt,
DatadirTimeline, RepositoryImpl,
DatadirTimeline, RepositoryImpl, TimelineImpl,
};
use crate::{import_datadir, LOG_FILE_NAME};
use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager};
@@ -54,27 +54,27 @@ pub struct LocalTimelineInfo {
}
impl LocalTimelineInfo {
pub fn from_loaded_timeline<R: Repository>(
datadir_tline: &DatadirTimeline<R>,
pub fn from_loaded_timeline(
timeline: &TimelineImpl,
include_non_incremental_logical_size: bool,
) -> anyhow::Result<Self> {
let last_record_lsn = datadir_tline.tline.get_last_record_lsn();
let last_record_lsn = timeline.get_last_record_lsn();
let info = LocalTimelineInfo {
ancestor_timeline_id: datadir_tline.tline.get_ancestor_timeline_id(),
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_lsn: {
match datadir_tline.tline.get_ancestor_lsn() {
match timeline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: datadir_tline.tline.get_disk_consistent_lsn(),
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn,
prev_record_lsn: Some(datadir_tline.tline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *datadir_tline.tline.get_latest_gc_cutoff_lsn(),
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Loaded,
current_logical_size: Some(datadir_tline.get_current_logical_size()),
current_logical_size: Some(timeline.get_current_logical_size()),
current_logical_size_non_incremental: if include_non_incremental_logical_size {
Some(datadir_tline.get_current_logical_size_non_incremental(last_record_lsn)?)
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
} else {
None
},
@@ -109,9 +109,8 @@ impl LocalTimelineInfo {
) -> anyhow::Result<Self> {
match repo_timeline {
RepositoryTimeline::Loaded(_) => {
let datadir_tline =
tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?;
Self::from_loaded_timeline(&datadir_tline, include_non_incremental_logical_size)
let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?;
Self::from_loaded_timeline(&*timeline, include_non_incremental_logical_size)
}
RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)),
}
@@ -298,19 +297,18 @@ fn bootstrap_timeline<R: Repository>(
// Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = repo.create_empty_timeline(tli, lsn)?;
let mut page_tline: DatadirTimeline<R> = DatadirTimeline::new(timeline, u64::MAX);
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &mut page_tline, lsn)?;
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
});
page_tline.tline.checkpoint(CheckpointConfig::Forced)?;
timeline.checkpoint(CheckpointConfig::Forced)?;
info!(
"created root timeline {} timeline.lsn {}",
tli,
page_tline.tline.get_last_record_lsn()
timeline.get_last_record_lsn()
);
// Remove temp dir. We don't need it anymore
@@ -389,7 +387,7 @@ pub(crate) fn create_timeline(
// load the timeline into memory
let loaded_timeline =
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
LocalTimelineInfo::from_loaded_timeline(&loaded_timeline, false)
LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false)
.context("cannot fill timeline info")?
}
None => {
@@ -397,7 +395,7 @@ pub(crate) fn create_timeline(
// load the timeline into memory
let new_timeline =
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
LocalTimelineInfo::from_loaded_timeline(&new_timeline, false)
LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false)
.context("cannot fill timeline info")?
}
};

View File

@@ -34,7 +34,6 @@ use std::collections::HashMap;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::walrecord::*;
use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::xlog_utils::*;
@@ -44,8 +43,8 @@ use utils::lsn::Lsn;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
pub struct WalIngest<'a, R: Repository> {
timeline: &'a DatadirTimeline<R>,
pub struct WalIngest<'a, T: DatadirTimeline> {
timeline: &'a T,
checkpoint: CheckPoint,
checkpoint_modified: bool,
@@ -53,8 +52,8 @@ pub struct WalIngest<'a, R: Repository> {
relsize_cache: HashMap<RelTag, BlockNumber>,
}
impl<'a, R: Repository> WalIngest<'a, R> {
pub fn new(timeline: &DatadirTimeline<R>, startpoint: Lsn) -> Result<WalIngest<R>> {
impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
pub fn new(timeline: &T, startpoint: Lsn) -> Result<WalIngest<T>> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint)?;
@@ -80,7 +79,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
&mut self,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
@@ -268,7 +267,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn ingest_decoded_block(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
lsn: Lsn,
decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock,
@@ -328,7 +327,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn ingest_heapam_record(
&mut self,
buf: &mut Bytes,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -472,7 +471,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
/// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
fn ingest_xlog_dbase_create(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rec: &XlCreateDatabase,
) -> Result<()> {
let db_id = rec.db_id;
@@ -539,7 +538,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn ingest_xlog_smgr_create(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rec: &XlSmgrCreate,
) -> Result<()> {
let rel = RelTag {
@@ -557,7 +556,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
/// This is the same logic as in PostgreSQL's smgr_redo() function.
fn ingest_xlog_smgr_truncate(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rec: &XlSmgrTruncate,
) -> Result<()> {
let spcnode = rec.rnode.spcnode;
@@ -622,7 +621,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
///
fn ingest_xact_record(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
parsed: &XlXactParsedRecord,
is_commit: bool,
) -> Result<()> {
@@ -691,7 +690,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn ingest_clog_truncate_record(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
xlrec: &XlClogTruncate,
) -> Result<()> {
info!(
@@ -749,7 +748,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn ingest_multixact_create_record(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
xlrec: &XlMultiXactCreate,
) -> Result<()> {
// Create WAL record for updating the multixact-offsets page
@@ -828,7 +827,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn ingest_multixact_truncate_record(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
xlrec: &XlMultiXactTruncate,
) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
@@ -862,7 +861,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn ingest_relmap_page(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord,
) -> Result<()> {
@@ -878,7 +877,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn put_rel_creation(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rel: RelTag,
) -> Result<()> {
self.relsize_cache.insert(rel, 0);
@@ -888,7 +887,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn put_rel_page_image(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
@@ -900,7 +899,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn put_rel_wal_record(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rel: RelTag,
blknum: BlockNumber,
rec: ZenithWalRecord,
@@ -912,7 +911,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn put_rel_truncation(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rel: RelTag,
nblocks: BlockNumber,
) -> Result<()> {
@@ -923,7 +922,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn put_rel_drop(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rel: RelTag,
) -> Result<()> {
modification.put_rel_drop(rel)?;
@@ -948,7 +947,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn handle_rel_extend(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
rel: RelTag,
blknum: BlockNumber,
) -> Result<()> {
@@ -986,7 +985,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn put_slru_page_image(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
@@ -999,7 +998,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
fn handle_slru_extend(
&mut self,
modification: &mut DatadirModification<R>,
modification: &mut DatadirModification<T>,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
@@ -1052,6 +1051,7 @@ mod tests {
use super::*;
use crate::pgdatadir_mapping::create_test_timeline;
use crate::repository::repo_harness::*;
use crate::repository::Timeline;
use postgres_ffi::pg_constants;
/// Arbitrary relation tag, for testing.
@@ -1062,13 +1062,13 @@ mod tests {
forknum: 0,
};
fn assert_current_logical_size<R: Repository>(_timeline: &DatadirTimeline<R>, _lsn: Lsn) {
fn assert_current_logical_size<T: Timeline>(_timeline: &T, _lsn: Lsn) {
// TODO
}
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
fn init_walingest_test<T: DatadirTimeline>(tline: &T) -> Result<WalIngest<T>> {
let mut m = tline.begin_modification();
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
@@ -1082,7 +1082,7 @@ mod tests {
fn test_relsize() -> Result<()> {
let repo = RepoHarness::create("test_relsize")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification();
walingest.put_rel_creation(&mut m, TESTREL_A)?;
@@ -1098,7 +1098,7 @@ mod tests {
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
m.commit(Lsn(0x50))?;
assert_current_logical_size(&tline, Lsn(0x50));
assert_current_logical_size(&*tline, Lsn(0x50));
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
@@ -1145,7 +1145,7 @@ mod tests {
let mut m = tline.begin_modification();
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
m.commit(Lsn(0x60))?;
assert_current_logical_size(&tline, Lsn(0x60));
assert_current_logical_size(&*tline, Lsn(0x60));
// Check reported size and contents after truncation
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 2);
@@ -1210,7 +1210,7 @@ mod tests {
fn test_drop_extend() -> Result<()> {
let repo = RepoHarness::create("test_drop_extend")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification();
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
@@ -1250,7 +1250,7 @@ mod tests {
fn test_truncate_extend() -> Result<()> {
let repo = RepoHarness::create("test_truncate_extend")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&*tline)?;
// Create a 20 MB relation (the size is arbitrary)
let relsize = 20 * 1024 * 1024 / 8192;
@@ -1338,7 +1338,7 @@ mod tests {
fn test_large_rel() -> Result<()> {
let repo = RepoHarness::create("test_large_rel")?.load();
let tline = create_test_timeline(repo, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&tline)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
@@ -1349,7 +1349,7 @@ mod tests {
m.commit(Lsn(lsn))?;
}
assert_current_logical_size(&tline, Lsn(lsn));
assert_current_logical_size(&*tline, Lsn(lsn));
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
@@ -1365,7 +1365,7 @@ mod tests {
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
);
assert_current_logical_size(&tline, Lsn(lsn));
assert_current_logical_size(&*tline, Lsn(lsn));
// Truncate another block
lsn += 0x10;
@@ -1376,7 +1376,7 @@ mod tests {
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
);
assert_current_logical_size(&tline, Lsn(lsn));
assert_current_logical_size(&*tline, Lsn(lsn));
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
// This tests the behavior at segment boundaries
@@ -1393,7 +1393,7 @@ mod tests {
size -= 1;
}
assert_current_logical_size(&tline, Lsn(lsn));
assert_current_logical_size(&*tline, Lsn(lsn));
Ok(())
}

View File

@@ -25,7 +25,8 @@ use etcd_broker::{
use tokio::select;
use tracing::*;
use crate::DatadirTimelineImpl;
use crate::repository::{Repository, Timeline};
use crate::{RepositoryImpl, TimelineImpl};
use utils::{
lsn::Lsn,
pq_proto::ReplicationFeedback,
@@ -39,7 +40,7 @@ pub(super) fn spawn_connection_manager_task(
id: ZTenantTimelineId,
broker_loop_prefix: String,
mut client: Client,
local_timeline: Arc<DatadirTimelineImpl>,
local_timeline: Arc<TimelineImpl>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
@@ -245,7 +246,7 @@ async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) {
struct WalreceiverState {
id: ZTenantTimelineId,
/// Use pageserver data about the timeline to filter out some of the safekeepers.
local_timeline: Arc<DatadirTimelineImpl>,
local_timeline: Arc<TimelineImpl>,
/// The timeout on the connection to safekeeper for WAL streaming.
wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
@@ -283,7 +284,7 @@ struct EtcdSkTimeline {
impl WalreceiverState {
fn new(
id: ZTenantTimelineId,
local_timeline: Arc<DatadirTimelineImpl>,
local_timeline: Arc<<RepositoryImpl as Repository>::Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
@@ -1203,13 +1204,10 @@ mod tests {
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
},
local_timeline: Arc::new(DatadirTimelineImpl::new(
harness
.load()
.create_empty_timeline(TIMELINE_ID, Lsn(0))
.expect("Failed to create an empty timeline for dummy wal connection manager"),
10_000,
)),
local_timeline: harness
.load()
.create_empty_timeline(TIMELINE_ID, Lsn(0))
.expect("Failed to create an empty timeline for dummy wal connection manager"),
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1).unwrap(),

View File

@@ -20,6 +20,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use super::TaskEvent;
use crate::{
http::models::WalReceiverEntry,
pgdatadir_mapping::DatadirTimeline,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
@@ -177,7 +178,7 @@ pub async fn handle_walreceiver_connection(
caught_up = true;
}
let timeline_to_check = Arc::clone(&timeline.tline);
let timeline_to_check = Arc::clone(&timeline);
tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance())
.await
.with_context(|| {
@@ -225,7 +226,7 @@ pub async fn handle_walreceiver_connection(
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = u64::from(last_lsn);
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn());
let flush_lsn = u64::from(timeline.get_disk_consistent_lsn());
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let apply_lsn = u64::from(timeline_remote_consistent_lsn);