Remove Timeline trait, rename LayeredTimeline struct into Timeline

This commit is contained in:
Kirill Bulatov
2022-08-18 16:03:57 +03:00
committed by Kirill Bulatov
parent 12e87f0df3
commit 8043612334
14 changed files with 290 additions and 370 deletions

View File

@@ -22,8 +22,8 @@ use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::layered_repository::Timeline;
use crate::reltag::{RelTag, SlruKind};
use crate::DatadirTimeline;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName};
@@ -36,13 +36,12 @@ use utils::lsn::Lsn;
/// This is short-living object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between various functions
/// used for constructing tarball.
pub struct Basebackup<'a, W, T>
pub struct Basebackup<'a, W>
where
W: Write,
T: DatadirTimeline,
{
ar: Builder<AbortableWrite<W>>,
timeline: &'a Arc<T>,
timeline: &'a Arc<Timeline>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
@@ -57,18 +56,17 @@ where
// * When working without safekeepers. In this situation it is important to match the lsn
// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
// to start the replication.
impl<'a, W, T> Basebackup<'a, W, T>
impl<'a, W> Basebackup<'a, W>
where
W: Write,
T: DatadirTimeline,
{
pub fn new(
write: W,
timeline: &'a Arc<T>,
timeline: &'a Arc<Timeline>,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a, W, T>> {
) -> Result<Basebackup<'a, W>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
@@ -404,10 +402,9 @@ where
}
}
impl<'a, W, T> Drop for Basebackup<'a, W, T>
impl<'a, W> Drop for Basebackup<'a, W>
where
W: Write,
T: DatadirTimeline,
{
/// If the basebackup was not finished, prevent the Archive::drop() from
/// writing the end-of-archive marker.

View File

@@ -11,10 +11,9 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::layered_repository::{metadata::TimelineMetadata, LayeredTimeline};
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::layered_repository::{metadata::TimelineMetadata, Timeline};
use crate::repository::Repository;
use crate::repository::{LocalTimelineState, RepositoryTimeline};
use crate::repository::{Repository, Timeline};
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant_config::TenantConfOpt;
@@ -85,7 +84,7 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
// Helper functions to construct a LocalTimelineInfo struct for a timeline
fn local_timeline_info_from_loaded_timeline(
timeline: &LayeredTimeline,
timeline: &Timeline,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
@@ -160,7 +159,7 @@ fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> Lo
}
fn local_timeline_info_from_repo_timeline(
repo_timeline: &RepositoryTimeline<LayeredTimeline>,
repo_timeline: &RepositoryTimeline<Timeline>,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {

View File

@@ -11,6 +11,7 @@ use bytes::Bytes;
use tracing::*;
use walkdir::WalkDir;
use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::walingest::WalIngest;
@@ -39,9 +40,9 @@ pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
/// This is currently only used to import a cluster freshly created by initdb.
/// The code that deals with the checkpoint would not work right if the
/// cluster was not shut down cleanly.
pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
pub fn import_timeline_from_postgres_datadir(
path: &Path,
tline: &T,
tline: &Timeline,
lsn: Lsn,
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
@@ -99,8 +100,8 @@ pub fn import_timeline_from_postgres_datadir<T: DatadirTimeline>(
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
fn import_rel<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
fn import_rel<Reader: Read>(
modification: &mut DatadirModification,
path: &Path,
spcoid: Oid,
dboid: Oid,
@@ -178,8 +179,8 @@ fn import_rel<T: DatadirTimeline, Reader: Read>(
/// Import an SLRU segment file
///
fn import_slru<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
fn import_slru<Reader: Read>(
modification: &mut DatadirModification,
slru: SlruKind,
path: &Path,
mut reader: Reader,
@@ -234,12 +235,7 @@ fn import_slru<T: DatadirTimeline, Reader: Read>(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal<T: DatadirTimeline>(
walpath: &Path,
tline: &T,
startpoint: Lsn,
endpoint: Lsn,
) -> Result<()> {
fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
@@ -305,12 +301,12 @@ fn import_wal<T: DatadirTimeline>(
Ok(())
}
pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
tline: &T,
pub fn import_basebackup_from_tar<Reader: Read>(
tline: &Timeline,
reader: Reader,
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
info!("importing base at {base_lsn}");
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
@@ -347,8 +343,8 @@ pub fn import_basebackup_from_tar<T: DatadirTimeline, Reader: Read>(
Ok(())
}
pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
tline: &T,
pub fn import_wal_from_tar<Reader: Read>(
tline: &Timeline,
reader: Reader,
start_lsn: Lsn,
end_lsn: Lsn,
@@ -428,8 +424,8 @@ pub fn import_wal_from_tar<T: DatadirTimeline, Reader: Read>(
Ok(())
}
pub fn import_file<T: DatadirTimeline, Reader: Read>(
modification: &mut DatadirModification<T>,
pub fn import_file<Reader: Read>(
modification: &mut DatadirModification,
file_path: &Path,
reader: Reader,
len: usize,

View File

@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
use crate::storage_sync::index::RemoteIndex;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::repository::{GcResult, Repository, RepositoryTimeline, Timeline};
use crate::repository::{GcResult, Repository, RepositoryTimeline};
use crate::thread_mgr;
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
@@ -61,7 +61,7 @@ mod timeline;
use storage_layer::Layer;
use timeline::LayeredTimelineEntry;
pub use timeline::LayeredTimeline;
pub use timeline::Timeline;
// re-export this function so that page_cache.rs can use it.
pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file;
@@ -121,15 +121,13 @@ pub struct LayeredRepository {
/// Public interface
impl Repository for LayeredRepository {
type Timeline = LayeredTimeline;
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>> {
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Timeline>> {
let timelines = self.timelines.lock().unwrap();
self.get_timeline_internal(timelineid, &timelines)
.map(RepositoryTimeline::from)
}
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<LayeredTimeline>> {
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
match self.get_timeline_load_internal(timelineid, &mut timelines)? {
Some(local_loaded_timeline) => Ok(local_loaded_timeline),
@@ -140,7 +138,7 @@ impl Repository for LayeredRepository {
}
}
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline<Self::Timeline>)> {
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline<Timeline>)> {
self.timelines
.lock()
.unwrap()
@@ -158,7 +156,7 @@ impl Repository for LayeredRepository {
&self,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<LayeredTimeline>> {
) -> Result<Arc<Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
let vacant_timeline_entry = match timelines.entry(timeline_id) {
Entry::Occupied(_) => bail!("Timeline already exists"),
@@ -176,7 +174,7 @@ impl Repository for LayeredRepository {
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
timeline::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?;
let timeline = LayeredTimeline::new(
let timeline = Timeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
metadata,
@@ -539,7 +537,7 @@ impl LayeredRepository {
&self,
timelineid: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> anyhow::Result<Option<Arc<LayeredTimeline>>> {
) -> anyhow::Result<Option<Arc<Timeline>>> {
match timelines.get(&timelineid) {
Some(entry) => match entry {
LayeredTimelineEntry::Loaded(local_timeline) => {
@@ -574,7 +572,7 @@ impl LayeredRepository {
&self,
timeline_id: ZTimelineId,
timelines: &mut HashMap<ZTimelineId, LayeredTimelineEntry>,
) -> anyhow::Result<Arc<LayeredTimeline>> {
) -> anyhow::Result<Arc<Timeline>> {
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
.context("failed to load metadata")?;
let disk_consistent_lsn = metadata.disk_consistent_lsn();
@@ -591,7 +589,7 @@ impl LayeredRepository {
.map(LayeredTimelineEntry::Loaded);
let _enter = info_span!("loading local timeline").entered();
let timeline = LayeredTimeline::new(
let timeline = Timeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
metadata,

View File

@@ -9,7 +9,7 @@ use once_cell::sync::Lazy;
use tracing::*;
use std::cmp::{max, min, Ordering};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::Write;
@@ -43,7 +43,6 @@ use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
use crate::DatadirTimeline;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use utils::{
@@ -52,7 +51,7 @@ use utils::{
zid::{ZTenantId, ZTimelineId},
};
use crate::repository::{GcResult, RepositoryTimeline, Timeline, TimelineWriter};
use crate::repository::{GcResult, RepositoryTimeline, TimelineWriter};
use crate::repository::{Key, Value};
use crate::thread_mgr;
use crate::virtual_file::VirtualFile;
@@ -160,7 +159,7 @@ static PERSISTENT_BYTES_WRITTEN: Lazy<IntCounter> = Lazy::new(|| {
#[derive(Clone)]
pub enum LayeredTimelineEntry {
Loaded(Arc<LayeredTimeline>),
Loaded(Arc<Timeline>),
Unloaded {
id: ZTimelineId,
metadata: TimelineMetadata,
@@ -191,7 +190,7 @@ impl LayeredTimelineEntry {
}
}
fn ensure_loaded(&self) -> anyhow::Result<&Arc<LayeredTimeline>> {
fn ensure_loaded(&self) -> anyhow::Result<&Arc<Timeline>> {
match self {
LayeredTimelineEntry::Loaded(timeline) => Ok(timeline),
LayeredTimelineEntry::Unloaded { .. } => {
@@ -213,7 +212,7 @@ impl LayeredTimelineEntry {
}
}
impl From<LayeredTimelineEntry> for RepositoryTimeline<LayeredTimeline> {
impl From<LayeredTimelineEntry> for RepositoryTimeline<Timeline> {
fn from(entry: LayeredTimelineEntry) -> Self {
match entry {
LayeredTimelineEntry::Loaded(timeline) => RepositoryTimeline::Loaded(timeline as _),
@@ -288,7 +287,7 @@ impl TimelineMetrics {
}
}
pub struct LayeredTimeline {
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -385,7 +384,7 @@ pub struct LayeredTimeline {
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
/// Relation size cache
rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
}
pub struct WalReceiverInfo {
@@ -394,46 +393,6 @@ pub struct WalReceiverInfo {
pub last_received_msg_ts: u128,
}
/// Inherit all the functions from DatadirTimeline, to provide the
/// functionality to store PostgreSQL relations, SLRUs, etc. in a
/// LayeredTimeline.
impl DatadirTimeline for LayeredTimeline {
fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
if lsn >= *cached_lsn {
return Some(*nblocks);
}
}
None
}
fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
match rel_size_cache.entry(tag) {
Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
}
}
}
fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
rel_size_cache.insert(tag, (lsn, nblocks));
}
fn remove_cached_rel_size(&self, tag: &RelTag) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
rel_size_cache.remove(tag);
}
}
///
/// Information about how much history needs to be retained, needed by
/// Garbage Collection.
@@ -464,45 +423,37 @@ pub struct GcInfo {
}
/// Public interface functions
impl Timeline for LayeredTimeline {
fn get_ancestor_lsn(&self) -> Lsn {
impl Timeline {
//------------------------------------------------------------------------------
// Public GET functions
//------------------------------------------------------------------------------
/// Get the LSN where this branch was created
pub fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
}
fn get_ancestor_timeline_id(&self) -> Option<ZTimelineId> {
/// Get the ancestor's timeline id
pub fn get_ancestor_timeline_id(&self) -> Option<ZTimelineId> {
self.ancestor_timeline
.as_ref()
.map(LayeredTimelineEntry::timeline_id)
}
/// Wait until WAL has been received up to the given LSN.
fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
// This should never be called from the WAL receiver thread, because that could lead
// to a deadlock.
ensure!(
!IS_WAL_RECEIVER.with(|c| c.get()),
"wait_lsn called by WAL receiver thread"
);
self.metrics.wait_lsn_time_histo.observe_closure_duration(
|| self.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}",
lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn()
)
}))?;
Ok(())
}
fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard<Lsn> {
/// Lock and get timeline's GC cuttof
pub fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard<Lsn> {
self.latest_gc_cutoff_lsn.read().unwrap()
}
/// Look up the value with the given a key
fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes> {
/// Look up given page version.
///
/// NOTE: It is considered an error to 'get' a key that doesn't exist. The abstraction
/// above this needs to store suitable metadata to track what data exists with
/// what keys, in separate metadata entries. If a non-existent key is requested,
/// the Repository implementation may incorrectly return a value from an ancestor
/// branch, for example, or waste a lot of cycles chasing the non-existing key.
///
pub fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes> {
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
@@ -531,68 +482,31 @@ impl Timeline for LayeredTimeline {
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
}
/// Public entry point for checkpoint(). All the logic is in the private
/// checkpoint_internal function, this public facade just wraps it for
/// metrics collection.
fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
match cconf {
CheckpointConfig::Flush => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)
}
CheckpointConfig::Forced => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)?;
self.compact()
}
}
}
///
/// Validate lsn against initdb_lsn and latest_gc_cutoff_lsn.
///
fn check_lsn_is_in_scope(
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
) -> Result<()> {
ensure!(
lsn >= **latest_gc_cutoff_lsn,
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
lsn,
**latest_gc_cutoff_lsn,
);
Ok(())
}
fn get_last_record_lsn(&self) -> Lsn {
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
}
fn get_prev_record_lsn(&self) -> Lsn {
pub fn get_prev_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().prev
}
fn get_last_record_rlsn(&self) -> RecordLsn {
/// Atomically get both last and prev.
pub fn get_last_record_rlsn(&self) -> RecordLsn {
self.last_record_lsn.load()
}
fn get_disk_consistent_lsn(&self) -> Lsn {
pub fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn.load()
}
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
Box::new(LayeredTimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
})
}
fn get_physical_size(&self) -> u64 {
/// Get the physical size of the timeline at the latest LSN
pub fn get_physical_size(&self) -> u64 {
self.metrics.current_physical_size_gauge.get()
}
fn get_physical_size_non_incremental(&self) -> anyhow::Result<u64> {
/// Get the physical size of the timeline at the latest LSN non incrementally
pub fn get_physical_size_non_incremental(&self) -> anyhow::Result<u64> {
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
// total size of layer files in the current timeline directory
let mut total_physical_size = 0;
@@ -611,9 +525,89 @@ impl Timeline for LayeredTimeline {
Ok(total_physical_size)
}
///
/// Wait until WAL has been received and processed up to this LSN.
///
/// You should call this before any of the other get_* or list_* functions. Calling
/// those functions with an LSN that has been processed yet is an error.
///
pub fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
// This should never be called from the WAL receiver thread, because that could lead
// to a deadlock.
ensure!(
!IS_WAL_RECEIVER.with(|c| c.get()),
"wait_lsn called by WAL receiver thread"
);
self.metrics.wait_lsn_time_histo.observe_closure_duration(
|| self.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}",
lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn()
)
}))?;
Ok(())
}
/// Check that it is valid to request operations with that lsn.
pub fn check_lsn_is_in_scope(
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
) -> Result<()> {
ensure!(
lsn >= **latest_gc_cutoff_lsn,
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
lsn,
**latest_gc_cutoff_lsn,
);
Ok(())
}
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
pub fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
match cconf {
CheckpointConfig::Flush => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)
}
CheckpointConfig::Forced => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)?;
self.compact()
}
}
}
/// Mutate the timeline with a [`TimelineWriter`].
///
/// FIXME: This ought to return &'a TimelineWriter, where TimelineWriter
/// is a generic type in this trait. But that doesn't currently work in
/// Rust: https://rust-lang.github.io/rfcs/1598-generic_associated_types.html
/// TODO kb replace with the concrete type
pub fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
Box::new(LayeredTimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
})
}
}
impl LayeredTimeline {
// Private functions
impl Timeline {
fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
@@ -662,8 +656,8 @@ impl LayeredTimeline {
tenant_id: ZTenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
) -> LayeredTimeline {
let mut result = LayeredTimeline {
) -> Timeline {
let mut result = Timeline {
conf,
tenant_conf,
timeline_id,
@@ -1014,7 +1008,7 @@ impl LayeredTimeline {
Some((lsn, img))
}
fn get_ancestor_timeline(&self) -> Result<Arc<LayeredTimeline>> {
fn get_ancestor_timeline(&self) -> Result<Arc<Timeline>> {
let ancestor = self
.ancestor_timeline
.as_ref()
@@ -1135,7 +1129,7 @@ impl LayeredTimeline {
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
///
pub fn check_checkpoint_distance(self: &Arc<LayeredTimeline>) -> Result<()> {
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
@@ -2211,12 +2205,12 @@ fn layer_traversal_error(
}
struct LayeredTimelineWriter<'a> {
tl: &'a LayeredTimeline,
tl: &'a Timeline,
_write_guard: MutexGuard<'a, ()>,
}
impl Deref for LayeredTimelineWriter<'_> {
type Target = dyn Timeline;
type Target = Timeline;
fn deref(&self) -> &Self::Target {
self.tl

View File

@@ -28,8 +28,6 @@ use tracing::info;
use crate::thread_mgr::ThreadKind;
use metrics::{register_int_gauge_vec, IntGaugeVec};
use pgdatadir_mapping::DatadirTimeline;
/// Current storage format version
///
/// This is embedded in the metadata file, and also in the header of all the

View File

@@ -30,11 +30,11 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
@@ -636,8 +636,8 @@ impl PageServerHandler {
/// In either case, if the page server hasn't received the WAL up to the
/// requested LSN yet, we will wait for it to arrive. The return value is
/// the LSN that should be used to look up the page versions.
fn wait_or_get_last_lsn<T: DatadirTimeline>(
timeline: &T,
fn wait_or_get_last_lsn(
timeline: &Timeline,
mut lsn: Lsn,
latest: bool,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
@@ -684,9 +684,9 @@ impl PageServerHandler {
Ok(lsn)
}
fn handle_get_rel_exists_request<T: DatadirTimeline>(
fn handle_get_rel_exists_request(
&self,
timeline: &T,
timeline: &Timeline,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered();
@@ -701,9 +701,9 @@ impl PageServerHandler {
}))
}
fn handle_get_nblocks_request<T: DatadirTimeline>(
fn handle_get_nblocks_request(
&self,
timeline: &T,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered();
@@ -717,9 +717,9 @@ impl PageServerHandler {
}))
}
fn handle_db_size_request<T: DatadirTimeline>(
fn handle_db_size_request(
&self,
timeline: &T,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_db_size", dbnode = %req.dbnode, req_lsn = %req.lsn).entered();
@@ -735,9 +735,9 @@ impl PageServerHandler {
}))
}
fn handle_get_page_at_lsn_request<T: DatadirTimeline>(
fn handle_get_page_at_lsn_request(
&self,
timeline: &T,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn)

View File

@@ -7,8 +7,8 @@
//! Clarify that)
//!
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::layered_repository::Timeline;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Timeline;
use crate::repository::*;
use crate::walrecord::ZenithWalRecord;
use anyhow::{bail, ensure, Result};
@@ -18,7 +18,7 @@ use postgres_ffi::v14::xlog_utils::TimestampTz;
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::Range;
use tracing::{debug, trace, warn};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -35,23 +35,13 @@ pub enum LsnForTimestamp {
}
///
/// This trait provides all the functionality to store PostgreSQL relations, SLRUs,
/// This impl provides all the functionality to store PostgreSQL relations, SLRUs,
/// and other special kinds of files, in a versioned key-value store. The
/// Timeline trait provides the key-value store.
/// Timeline struct provides the key-value store.
///
/// This is a trait, so that we can easily include all these functions in a Timeline
/// implementation. You're not expected to have different implementations of this trait,
/// rather, this provides an interface and implementation, over Timeline.
///
/// If you wanted to store other kinds of data in the Neon repository, e.g.
/// flat files or MySQL, you would create a new trait like this, with all the
/// functions that make sense for the kind of data you're storing. For flat files,
/// for example, you might have a function like "fn read(path, offset, size)".
/// We might also have that situation in the future, to support multiple PostgreSQL
/// versions, if there are big changes in how the data is organized in the data
/// directory, or if new special files are introduced.
///
pub trait DatadirTimeline: Timeline {
/// This is a separate impl, so that we can easily include all these functions in a Timeline
/// implementation, and might be moved into a separate struct later.
impl Timeline {
/// Start ingesting a WAL record, or other atomic modification of
/// the timeline.
///
@@ -75,7 +65,7 @@ pub trait DatadirTimeline: Timeline {
/// functions of the timeline until you finish! And if you update the
/// same page twice, the last update wins.
///
fn begin_modification(&self, lsn: Lsn) -> DatadirModification<Self>
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification
where
Self: Sized,
{
@@ -93,7 +83,7 @@ pub trait DatadirTimeline: Timeline {
//------------------------------------------------------------------------------
/// Look up given page version.
fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
pub fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
ensure!(tag.relnode != 0, "invalid relnode");
let nblocks = self.get_rel_size(tag, lsn)?;
@@ -110,7 +100,7 @@ pub trait DatadirTimeline: Timeline {
}
// Get size of a database in blocks
fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
let mut total_blocks = 0;
let rels = self.list_rels(spcnode, dbnode, lsn)?;
@@ -123,7 +113,7 @@ pub trait DatadirTimeline: Timeline {
}
/// Get size of a relation file
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
ensure!(tag.relnode != 0, "invalid relnode");
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
@@ -151,7 +141,7 @@ pub trait DatadirTimeline: Timeline {
}
/// Does relation exist?
fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
ensure!(tag.relnode != 0, "invalid relnode");
// first try to lookup relation in cache
@@ -169,7 +159,7 @@ pub trait DatadirTimeline: Timeline {
}
/// Get a list of all existing relations in given tablespace and database.
fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = self.get(key, lsn)?;
@@ -187,7 +177,7 @@ pub trait DatadirTimeline: Timeline {
}
/// Look up given SLRU page version.
fn get_slru_page_at_lsn(
pub fn get_slru_page_at_lsn(
&self,
kind: SlruKind,
segno: u32,
@@ -199,14 +189,19 @@ pub trait DatadirTimeline: Timeline {
}
/// Get size of an SLRU segment
fn get_slru_segment_size(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<BlockNumber> {
pub fn get_slru_segment_size(
&self,
kind: SlruKind,
segno: u32,
lsn: Lsn,
) -> Result<BlockNumber> {
let key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(key, lsn)?;
Ok(buf.get_u32_le())
}
/// Get size of an SLRU segment
fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = self.get(key, lsn)?;
@@ -223,7 +218,7 @@ pub trait DatadirTimeline: Timeline {
/// so it's not well defined which LSN you get if there were multiple commits
/// "in flight" at that point in time.
///
fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result<LsnForTimestamp> {
pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result<LsnForTimestamp> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let min_lsn = *gc_cutoff_lsn_guard;
let max_lsn = self.get_last_record_lsn();
@@ -286,7 +281,7 @@ pub trait DatadirTimeline: Timeline {
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
/// with a smaller/larger timestamp.
///
fn is_latest_commit_timestamp_ge_than(
pub fn is_latest_commit_timestamp_ge_than(
&self,
search_timestamp: TimestampTz,
probe_lsn: Lsn,
@@ -317,7 +312,7 @@ pub trait DatadirTimeline: Timeline {
}
/// Get a list of SLRU segments
fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
// fetch directory entry
let key = slru_dir_to_key(kind);
@@ -327,14 +322,14 @@ pub trait DatadirTimeline: Timeline {
Ok(dir.segments)
}
fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
let key = relmap_file_key(spcnode, dbnode);
let buf = self.get(key, lsn)?;
Ok(buf)
}
fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
pub fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
// fetch directory entry
let buf = self.get(DBDIR_KEY, lsn)?;
let dir = DbDirectory::des(&buf)?;
@@ -342,13 +337,13 @@ pub trait DatadirTimeline: Timeline {
Ok(dir.dbdirs)
}
fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
let key = twophase_file_key(xid);
let buf = self.get(key, lsn)?;
Ok(buf)
}
fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
pub fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
// fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn)?;
let dir = TwoPhaseDirectory::des(&buf)?;
@@ -356,11 +351,11 @@ pub trait DatadirTimeline: Timeline {
Ok(dir.xids)
}
fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
pub fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
self.get(CONTROLFILE_KEY, lsn)
}
fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
pub fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
self.get(CHECKPOINT_KEY, lsn)
}
@@ -369,7 +364,7 @@ pub trait DatadirTimeline: Timeline {
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn)?;
let dbdir = DbDirectory::des(&buf)?;
@@ -391,7 +386,7 @@ pub trait DatadirTimeline: Timeline {
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from
/// that LSN forwards).
fn collect_keyspace(&self, lsn: Lsn) -> Result<KeySpace> {
pub fn collect_keyspace(&self, lsn: Lsn) -> Result<KeySpace> {
// Iterate through key ranges, greedily packing them into partitions
let mut result = KeySpaceAccum::new();
@@ -465,27 +460,54 @@ pub trait DatadirTimeline: Timeline {
}
/// Get cached size of relation if it not updated after specified LSN
fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber>;
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
if lsn >= *cached_lsn {
return Some(*nblocks);
}
}
None
}
/// Update cached relation size if there is no more recent update
fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber);
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
match rel_size_cache.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
}
}
}
/// Store cached relation size
fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber);
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
rel_size_cache.insert(tag, (lsn, nblocks));
}
/// Remove cached relation size
fn remove_cached_rel_size(&self, tag: &RelTag);
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
rel_size_cache.remove(tag);
}
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
/// by a one WAL record appear atomic.
pub struct DatadirModification<'a, T: DatadirTimeline> {
pub struct DatadirModification<'a> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
/// in the state in 'tline' yet.
pub tline: &'a T,
pub tline: &'a Timeline,
/// Lsn assigned by begin_modification
pub lsn: Lsn,
@@ -498,7 +520,7 @@ pub struct DatadirModification<'a, T: DatadirTimeline> {
pending_nblocks: isize,
}
impl<'a, T: DatadirTimeline> DatadirModification<'a, T> {
impl<'a> DatadirModification<'a> {
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -1371,7 +1393,7 @@ fn is_slru_block_key(key: Key) -> bool {
pub fn create_test_timeline<R: Repository>(
repo: R,
timeline_id: utils::zid::ZTimelineId,
) -> Result<std::sync::Arc<R::Timeline>> {
) -> Result<std::sync::Arc<Timeline>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;

View File

@@ -1,19 +1,16 @@
use crate::layered_repository::metadata::TimelineMetadata;
use crate::layered_repository::Timeline;
use crate::storage_sync::index::RemoteIndex;
use crate::walrecord::ZenithWalRecord;
use crate::CheckpointConfig;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::sync::{Arc, RwLockReadGuard};
use std::sync::Arc;
use std::time::Duration;
use utils::{
lsn::{Lsn, RecordLsn},
zid::ZTimelineId,
};
use utils::{lsn::Lsn, zid::ZTimelineId};
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
/// Key used in the Repository kv-store.
@@ -185,22 +182,20 @@ impl Value {
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
type Timeline: crate::DatadirTimeline;
/// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Get Timeline handle for given zenith timeline ID.
/// This function is idempotent. It doesn't change internal state in any way.
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Self::Timeline>>;
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Timeline>>;
/// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded.
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<Self::Timeline>>;
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<Timeline>>;
/// Lists timelines the repository contains.
/// Up to repository's implementation to omit certain timelines that ar not considered ready for use.
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline<Self::Timeline>)>;
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline<Timeline>)>;
/// Create a new, empty timeline. The caller is responsible for loading data into it
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
@@ -208,7 +203,7 @@ pub trait Repository: Send + Sync {
&self,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<Self::Timeline>>;
) -> Result<Arc<Timeline>>;
/// Branch a timeline
fn branch_timeline(
@@ -305,81 +300,6 @@ impl AddAssign for GcResult {
}
}
pub trait Timeline: Send + Sync {
//------------------------------------------------------------------------------
// Public GET functions
//------------------------------------------------------------------------------
///
/// Wait until WAL has been received and processed up to this LSN.
///
/// You should call this before any of the other get_* or list_* functions. Calling
/// those functions with an LSN that has been processed yet is an error.
///
fn wait_lsn(&self, lsn: Lsn) -> Result<()>;
/// Lock and get timeline's GC cuttof
fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard<Lsn>;
/// Look up given page version.
///
/// NOTE: It is considered an error to 'get' a key that doesn't exist. The abstraction
/// above this needs to store suitable metadata to track what data exists with
/// what keys, in separate metadata entries. If a non-existent key is requested,
/// the Repository implementation may incorrectly return a value from an ancestor
/// branch, for example, or waste a lot of cycles chasing the non-existing key.
///
fn get(&self, key: Key, lsn: Lsn) -> Result<Bytes>;
/// Get the ancestor's timeline id
fn get_ancestor_timeline_id(&self) -> Option<ZTimelineId>;
/// Get the LSN where this branch was created
fn get_ancestor_lsn(&self) -> Lsn;
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Atomically get both last and prev.
fn get_last_record_rlsn(&self) -> RecordLsn;
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
fn get_last_record_lsn(&self) -> Lsn;
fn get_prev_record_lsn(&self) -> Lsn;
fn get_disk_consistent_lsn(&self) -> Lsn;
/// Mutate the timeline with a [`TimelineWriter`].
///
/// FIXME: This ought to return &'a TimelineWriter, where TimelineWriter
/// is a generic type in this trait. But that doesn't currently work in
/// Rust: https://rust-lang.github.io/rfcs/1598-generic_associated_types.html
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a>;
///
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()>;
///
/// Check that it is valid to request operations with that lsn.
fn check_lsn_is_in_scope(
&self,
lsn: Lsn,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
) -> Result<()>;
/// Get the physical size of the timeline at the latest LSN
fn get_physical_size(&self) -> u64;
/// Get the physical size of the timeline at the latest LSN non incrementally
fn get_physical_size_non_incremental(&self) -> Result<u64>;
}
/// Various functions to mutate the timeline.
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
// This is probably considered a bad practice in Rust and should be fixed eventually,
@@ -581,6 +501,9 @@ pub mod repo_harness {
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use crate::layered_repository::Timeline;
use crate::CheckpointConfig;
use super::repo_harness::*;
use super::*;
//use postgres_ffi::{pg_constants, xlog_utils::SIZEOF_CHECKPOINT};
@@ -689,7 +612,7 @@ mod tests {
Ok(())
}
fn make_some_layers<T: Timeline>(tline: &T, start_lsn: Lsn) -> Result<()> {
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{

View File

@@ -3,7 +3,7 @@
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::layered_repository::{load_metadata, LayeredRepository, LayeredTimeline};
use crate::layered_repository::{load_metadata, LayeredRepository, Timeline};
use crate::repository::Repository;
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
@@ -100,7 +100,7 @@ struct Tenant {
///
/// Local timelines have more metadata that's loaded into memory,
/// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`].
local_timelines: HashMap<ZTimelineId, Arc<LayeredTimeline>>,
local_timelines: HashMap<ZTimelineId, Arc<Timeline>>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -177,7 +177,7 @@ pub enum LocalTimelineUpdate {
},
Attach {
id: ZTenantTimelineId,
datadir: Arc<LayeredTimeline>,
datadir: Arc<Timeline>,
},
}
@@ -379,7 +379,7 @@ pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<Lay
pub fn get_local_timeline_with_load(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<LayeredTimeline>> {
) -> anyhow::Result<Arc<Timeline>> {
let mut m = tenants_state::write_tenants();
let tenant = m
.get_mut(&tenant_id)
@@ -486,7 +486,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
fn load_local_timeline(
repo: &LayeredRepository,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<LayeredTimeline>> {
) -> anyhow::Result<Arc<Timeline>> {
let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| {
format!("Inmem timeline {timeline_id} not found in tenant's repository")
})?;

View File

@@ -20,15 +20,15 @@ use utils::{
use crate::import_datadir;
use crate::tenant_mgr;
use crate::CheckpointConfig;
use crate::{
config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt,
};
use crate::{
layered_repository::{LayeredRepository, LayeredTimeline},
layered_repository::{LayeredRepository, Timeline},
walredo::WalRedoManager,
};
use crate::{repository::Timeline, CheckpointConfig};
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
@@ -160,7 +160,7 @@ pub(crate) fn create_timeline(
new_timeline_id: Option<ZTimelineId>,
ancestor_timeline_id: Option<ZTimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
) -> Result<Option<(ZTimelineId, Arc<LayeredTimeline>)>> {
) -> Result<Option<(ZTimelineId, Arc<Timeline>)>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate);
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;

View File

@@ -30,6 +30,7 @@ use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::walrecord::*;
@@ -43,15 +44,15 @@ use utils::lsn::Lsn;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
pub struct WalIngest<'a, T: DatadirTimeline> {
timeline: &'a T,
pub struct WalIngest<'a> {
timeline: &'a Timeline,
checkpoint: CheckPoint,
checkpoint_modified: bool,
}
impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
pub fn new(timeline: &T, startpoint: Lsn) -> Result<WalIngest<T>> {
impl<'a> WalIngest<'a> {
pub fn new(timeline: &Timeline, startpoint: Lsn) -> Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint)?;
@@ -77,7 +78,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
&mut self,
recdata: Bytes,
lsn: Lsn,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
modification.lsn = lsn;
@@ -266,7 +267,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_decoded_block(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
lsn: Lsn,
decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock,
@@ -326,7 +327,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_heapam_record(
&mut self,
buf: &mut Bytes,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
decoded: &mut DecodedWALRecord,
) -> Result<()> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -470,7 +471,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
/// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
fn ingest_xlog_dbase_create(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rec: &XlCreateDatabase,
) -> Result<()> {
let db_id = rec.db_id;
@@ -537,7 +538,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_xlog_smgr_create(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rec: &XlSmgrCreate,
) -> Result<()> {
let rel = RelTag {
@@ -555,7 +556,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
/// This is the same logic as in PostgreSQL's smgr_redo() function.
fn ingest_xlog_smgr_truncate(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rec: &XlSmgrTruncate,
) -> Result<()> {
let spcnode = rec.rnode.spcnode;
@@ -620,7 +621,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
///
fn ingest_xact_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
parsed: &XlXactParsedRecord,
is_commit: bool,
) -> Result<()> {
@@ -689,7 +690,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_clog_truncate_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
xlrec: &XlClogTruncate,
) -> Result<()> {
info!(
@@ -747,7 +748,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_multixact_create_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
xlrec: &XlMultiXactCreate,
) -> Result<()> {
// Create WAL record for updating the multixact-offsets page
@@ -826,7 +827,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_multixact_truncate_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
xlrec: &XlMultiXactTruncate,
) -> Result<()> {
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
@@ -860,7 +861,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn ingest_relmap_page(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord,
) -> Result<()> {
@@ -876,7 +877,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_creation(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rel: RelTag,
) -> Result<()> {
modification.put_rel_creation(rel, 0)?;
@@ -885,7 +886,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_page_image(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rel: RelTag,
blknum: BlockNumber,
img: Bytes,
@@ -897,7 +898,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_wal_record(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rel: RelTag,
blknum: BlockNumber,
rec: ZenithWalRecord,
@@ -909,7 +910,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_rel_truncation(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rel: RelTag,
nblocks: BlockNumber,
) -> Result<()> {
@@ -917,11 +918,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
Ok(())
}
fn put_rel_drop(
&mut self,
modification: &mut DatadirModification<T>,
rel: RelTag,
) -> Result<()> {
fn put_rel_drop(&mut self, modification: &mut DatadirModification, rel: RelTag) -> Result<()> {
modification.put_rel_drop(rel)?;
Ok(())
}
@@ -937,7 +934,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn handle_rel_extend(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
rel: RelTag,
blknum: BlockNumber,
) -> Result<()> {
@@ -968,7 +965,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn put_slru_page_image(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
@@ -981,7 +978,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
fn handle_slru_extend(
&mut self,
modification: &mut DatadirModification<T>,
modification: &mut DatadirModification,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
@@ -1032,9 +1029,9 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::create_test_timeline;
use crate::repository::repo_harness::*;
use crate::repository::Timeline;
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
use postgres_ffi::RELSEG_SIZE;
@@ -1046,13 +1043,13 @@ mod tests {
forknum: 0,
};
fn assert_current_logical_size<T: Timeline>(_timeline: &T, _lsn: Lsn) {
fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) {
// TODO
}
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
fn init_walingest_test<T: DatadirTimeline>(tline: &T) -> Result<WalIngest<T>> {
fn init_walingest_test(tline: &Timeline) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file

View File

@@ -16,7 +16,7 @@ use std::{
time::Duration,
};
use crate::{layered_repository::LayeredTimeline, repository::Timeline};
use crate::layered_repository::Timeline;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
@@ -39,7 +39,7 @@ pub(super) fn spawn_connection_manager_task(
id: ZTenantTimelineId,
broker_loop_prefix: String,
mut client: Client,
local_timeline: Arc<LayeredTimeline>,
local_timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
@@ -242,7 +242,7 @@ const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
struct WalreceiverState {
id: ZTenantTimelineId,
/// Use pageserver data about the timeline to filter out some of the safekeepers.
local_timeline: Arc<LayeredTimeline>,
local_timeline: Arc<Timeline>,
/// The timeout on the connection to safekeeper for WAL streaming.
wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
@@ -300,7 +300,7 @@ struct EtcdSkTimeline {
impl WalreceiverState {
fn new(
id: ZTenantTimelineId,
local_timeline: Arc<LayeredTimeline>,
local_timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,

View File

@@ -20,11 +20,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use super::TaskEvent;
use crate::{
layered_repository::WalReceiverInfo,
pgdatadir_mapping::DatadirTimeline,
repository::{Repository, Timeline},
tenant_mgr,
walingest::WalIngest,
layered_repository::WalReceiverInfo, repository::Repository, tenant_mgr, walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::v14::waldecoder::WalStreamDecoder;