mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
Revert a bunch of commits that I pushed by accident
This reverts commits:e35a5aa550a389c2ed7f11ebcb531f8d2b61f4d1882f549236ddb7155bbeThose were follow-up work on top of PR https://github.com/zenithdb/zenith/pull/430, but they were still very much not ready.
This commit is contained in:
@@ -35,19 +35,15 @@ use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
|
||||
use zenith_metrics::{register_histogram_vec, HistogramVec};
|
||||
use zenith_metrics::{register_histogram, Histogram};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::{AtomicLsn, Lsn};
|
||||
use zenith_utils::seqwait::SeqWait;
|
||||
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod layer_map;
|
||||
mod snapshot_layer;
|
||||
mod storage_layer;
|
||||
|
||||
use image_layer::ImageLayer;
|
||||
use inmemory_layer::InMemoryLayer;
|
||||
use layer_map::LayerMap;
|
||||
use snapshot_layer::SnapshotLayer;
|
||||
@@ -58,14 +54,14 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
|
||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
// Flush out an inmemory layer, if it's holding WAL older than
|
||||
// this. This puts a backstop on how much WAL needs to be re-digested
|
||||
// if the page server is restarted.
|
||||
// Perform a checkpoint in the GC thread, when the LSN has advanced this much since
|
||||
// last checkpoint. This puts a backstop on how much WAL needs to be re-digested if
|
||||
// the page server is restarted.
|
||||
//
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024;
|
||||
static CHECKPOINT_INTERVAL: u64 = 16 * 1024 * 1024;
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
@@ -77,16 +73,6 @@ lazy_static! {
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
static ref RECONSTRUCT_TIME: Histogram = register_histogram!(
|
||||
"pageserver_getpage_reconstruct_time",
|
||||
"FIXME Time spent on storage operations"
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
///
|
||||
/// Repository consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -275,11 +261,11 @@ impl LayeredRepository {
|
||||
{
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
for (_timelineid, timeline) in timelines.iter() {
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["checkpoint_timed"])
|
||||
.observe_closure_duration(
|
||||
|| timeline.checkpoint_internal(false)
|
||||
)?
|
||||
let distance = u64::from(timeline.last_valid_lsn.load())
|
||||
- u64::from(timeline.last_checkpoint_lsn.load());
|
||||
if distance > CHECKPOINT_INTERVAL {
|
||||
timeline.checkpoint()?;
|
||||
}
|
||||
}
|
||||
// release lock on 'timelines'
|
||||
}
|
||||
@@ -470,7 +456,7 @@ pub struct LayeredTimeline {
|
||||
last_record_lsn: AtomicLsn,
|
||||
prev_record_lsn: AtomicLsn,
|
||||
|
||||
oldest_pending_lsn: AtomicLsn,
|
||||
last_checkpoint_lsn: AtomicLsn,
|
||||
|
||||
// Parent timeline that this timeline was branched from, and the LSN
|
||||
// of the branch point.
|
||||
@@ -499,10 +485,7 @@ impl Timeline for LayeredTimeline {
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
|
||||
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
|
||||
RECONSTRUCT_TIME
|
||||
.observe_closure_duration(|| {
|
||||
self.materialize_page(seg, blknum, lsn, &*layer)
|
||||
})
|
||||
self.materialize_page(seg, blknum, lsn, &*layer)
|
||||
} else {
|
||||
bail!("relish {} not found at {}", rel, lsn);
|
||||
}
|
||||
@@ -791,8 +774,8 @@ impl Timeline for LayeredTimeline {
|
||||
/// metrics collection.
|
||||
fn checkpoint(&self) -> Result<()> {
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["checkpoint_force"])
|
||||
.observe_closure_duration(|| self.checkpoint_internal(true))
|
||||
.with_label_values(&["checkpoint"])
|
||||
.observe_closure_duration(|| self.checkpoint_internal())
|
||||
}
|
||||
|
||||
/// Remember that WAL has been received and added to the page cache up to the given LSN
|
||||
@@ -884,7 +867,7 @@ impl LayeredTimeline {
|
||||
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
|
||||
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
|
||||
prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0),
|
||||
oldest_pending_lsn: AtomicLsn::new(metadata.last_valid_lsn.0),
|
||||
last_checkpoint_lsn: AtomicLsn::new(metadata.last_valid_lsn.0),
|
||||
|
||||
ancestor_timeline: ancestor,
|
||||
ancestor_lsn: metadata.ancestor_lsn,
|
||||
@@ -901,33 +884,19 @@ impl LayeredTimeline {
|
||||
self.timelineid
|
||||
);
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let (snapfilenames, imgfilenames) =
|
||||
filename::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?;
|
||||
|
||||
for filename in imgfilenames.iter() {
|
||||
let layer = ImageLayer::load_image_layer(self.conf, self.timelineid, self.tenantid, filename)?;
|
||||
|
||||
info!(
|
||||
"found layer {} {} on timeline {}",
|
||||
layer.get_seg_tag(),
|
||||
layer.get_start_lsn(),
|
||||
self.timelineid
|
||||
);
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
}
|
||||
|
||||
for filename in snapfilenames.iter() {
|
||||
let layer = SnapshotLayer::load_snapshot_layer(self.conf, self.timelineid, self.tenantid, filename)?;
|
||||
let snapfiles =
|
||||
SnapshotLayer::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?;
|
||||
|
||||
for layer_rc in snapfiles.iter() {
|
||||
info!(
|
||||
"found layer {} {}-{} {} on timeline {}",
|
||||
layer.get_seg_tag(),
|
||||
layer.get_start_lsn(),
|
||||
layer.get_end_lsn(),
|
||||
layer.is_dropped(),
|
||||
layer_rc.get_seg_tag(),
|
||||
layer_rc.get_start_lsn(),
|
||||
layer_rc.get_end_lsn(),
|
||||
layer_rc.is_dropped(),
|
||||
self.timelineid
|
||||
);
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
layers.insert_historic(Arc::clone(layer_rc));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1034,23 +1003,23 @@ impl LayeredTimeline {
|
||||
let layer;
|
||||
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(seg, lsn)? {
|
||||
// Create new entry after the previous one.
|
||||
let start_lsn;
|
||||
let lsn;
|
||||
if prev_layer.get_timeline_id() != self.timelineid {
|
||||
// First modification on this timeline
|
||||
start_lsn = self.ancestor_lsn;
|
||||
lsn = self.ancestor_lsn;
|
||||
trace!(
|
||||
"creating file for write for {} at branch point {}/{}",
|
||||
seg,
|
||||
self.timelineid,
|
||||
start_lsn
|
||||
lsn
|
||||
);
|
||||
} else {
|
||||
start_lsn = prev_layer.get_end_lsn();
|
||||
lsn = prev_layer.get_end_lsn();
|
||||
trace!(
|
||||
"creating file for write for {} after previous layer {}/{}",
|
||||
seg,
|
||||
self.timelineid,
|
||||
start_lsn
|
||||
lsn
|
||||
);
|
||||
}
|
||||
trace!(
|
||||
@@ -1059,12 +1028,12 @@ impl LayeredTimeline {
|
||||
prev_layer.get_start_lsn(),
|
||||
prev_layer.get_end_lsn()
|
||||
);
|
||||
layer = InMemoryLayer::create_successor_layer(
|
||||
layer = InMemoryLayer::copy_snapshot(
|
||||
self.conf,
|
||||
prev_layer,
|
||||
&self,
|
||||
&*prev_layer,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
start_lsn,
|
||||
lsn,
|
||||
)?;
|
||||
} else {
|
||||
@@ -1076,7 +1045,7 @@ impl LayeredTimeline {
|
||||
lsn
|
||||
);
|
||||
|
||||
layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
|
||||
layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn)?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
@@ -1119,7 +1088,7 @@ impl LayeredTimeline {
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint_internal(&self, force: bool) -> Result<()> {
|
||||
fn checkpoint_internal(&self) -> Result<()> {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
let last_record_lsn = self.last_record_lsn.load();
|
||||
let prev_record_lsn = self.prev_record_lsn.load();
|
||||
@@ -1161,34 +1130,22 @@ impl LayeredTimeline {
|
||||
// Call freeze() on any unfrozen layers (that is, layers that haven't
|
||||
// been written to disk yet).
|
||||
// Call unload() on all frozen layers, to release memory.
|
||||
|
||||
let mut oldest_pending_lsn = last_valid_lsn;
|
||||
|
||||
while let Some(oldest_layer) = layers.get_oldest_open_layer() {
|
||||
|
||||
oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
|
||||
let distance = last_valid_lsn.0 - oldest_pending_lsn.0;
|
||||
if !force && distance < OLDEST_INMEM_DISTANCE {
|
||||
info!("the oldest layer is now {} which is {} bytes behind last_valid_lsn",
|
||||
oldest_layer.get_seg_tag(), distance);
|
||||
break;
|
||||
}
|
||||
|
||||
let (new_historics, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?;
|
||||
let mut iter = layers.iter_open_layers();
|
||||
while let Some(layer) = iter.next() {
|
||||
let (new_historic, new_open) = layer.freeze(last_valid_lsn, &self)?;
|
||||
|
||||
// replace this layer with the new layers that 'freeze' returned
|
||||
layers.pop_oldest();
|
||||
if let Some(n) = new_open {
|
||||
layers.insert_open(n);
|
||||
}
|
||||
for historic in new_historics {
|
||||
// (removes it if new_open is None)
|
||||
iter.replace(new_open);
|
||||
|
||||
if let Some(historic) = new_historic {
|
||||
trace!(
|
||||
"freeze returned layer {} {}-{}",
|
||||
historic.get_seg_tag(),
|
||||
historic.get_start_lsn(),
|
||||
historic.get_end_lsn()
|
||||
);
|
||||
layers.insert_historic(historic);
|
||||
iter.insert_historic(historic);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1214,7 +1171,7 @@ impl LayeredTimeline {
|
||||
};
|
||||
LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?;
|
||||
|
||||
self.oldest_pending_lsn.store(oldest_pending_lsn);
|
||||
self.last_checkpoint_lsn.store(last_valid_lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1245,20 +1202,22 @@ impl LayeredTimeline {
|
||||
let now = Instant::now();
|
||||
let mut result: GcResult = Default::default();
|
||||
|
||||
// Scan all snapshot files in the directory. For each file, if a newer file
|
||||
// exists, we can remove the old one.
|
||||
self.checkpoint()?;
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
info!(
|
||||
"running GC on timeline {}, cutoff {}",
|
||||
self.timelineid, cutoff
|
||||
);
|
||||
|
||||
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
let mut layers_to_remove: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||
|
||||
// Scan all snapshot files in the directory. For each file, if a newer file
|
||||
// exists, we can remove the old one.
|
||||
//
|
||||
// Determine for each file if it needs to be retained
|
||||
// FIXME: also scan open in-memory layers. Normally we cannot remove the
|
||||
// latest layer of any seg, but if it was unlinked it's possible
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
let seg = l.get_seg_tag();
|
||||
|
||||
|
||||
@@ -1,306 +0,0 @@
|
||||
use crate::layered_repository::storage_layer::{SegmentTag};
|
||||
use crate::relish::*;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use std::fmt;
|
||||
use std::fs;
|
||||
|
||||
use anyhow::{Result};
|
||||
use log::*;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
pub struct SnapshotFileName {
|
||||
pub seg: SegmentTag,
|
||||
pub start_lsn: Lsn,
|
||||
pub end_lsn: Lsn,
|
||||
pub dropped: bool,
|
||||
}
|
||||
|
||||
impl SnapshotFileName {
|
||||
fn from_str(fname: &str) -> Option<Self> {
|
||||
// Split the filename into parts
|
||||
//
|
||||
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>
|
||||
//
|
||||
// or if it was dropped:
|
||||
//
|
||||
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>_DROPPED
|
||||
//
|
||||
let rel;
|
||||
let mut parts;
|
||||
if let Some(rest) = fname.strip_prefix("rel_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Relation(RelTag {
|
||||
spcnode: parts.next()?.parse::<u32>().ok()?,
|
||||
dbnode: parts.next()?.parse::<u32>().ok()?,
|
||||
relnode: parts.next()?.parse::<u32>().ok()?,
|
||||
forknum: parts.next()?.parse::<u8>().ok()?,
|
||||
});
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_xact_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::Clog,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactMembers,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactOffsets,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::FileNodeMap {
|
||||
spcnode: parts.next()?.parse::<u32>().ok()?,
|
||||
dbnode: parts.next()?.parse::<u32>().ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_twophase_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::TwoPhase {
|
||||
xid: parts.next()?.parse::<u32>().ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Checkpoint;
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_control_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::ControlFile;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
|
||||
let segno = parts.next()?.parse::<u32>().ok()?;
|
||||
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno
|
||||
};
|
||||
|
||||
let start_lsn = Lsn::from_hex(parts.next()?).ok()?;
|
||||
let end_lsn = Lsn::from_hex(parts.next()?).ok()?;
|
||||
|
||||
let mut dropped = false;
|
||||
if let Some(suffix) = parts.next() {
|
||||
if suffix == "DROPPED" {
|
||||
dropped = true;
|
||||
} else {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
if parts.next().is_some() {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(SnapshotFileName {
|
||||
seg,
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
dropped,
|
||||
})
|
||||
}
|
||||
|
||||
fn to_string(&self) -> String {
|
||||
let basename = match self.seg.rel {
|
||||
RelishTag::Relation(reltag) => format!(
|
||||
"rel_{}_{}_{}_{}",
|
||||
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
|
||||
),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::Clog,
|
||||
segno,
|
||||
} => format!("pg_xact_{:04X}", segno),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactMembers,
|
||||
segno,
|
||||
} => format!("pg_multixact_members_{:04X}", segno),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactOffsets,
|
||||
segno,
|
||||
} => format!("pg_multixact_offsets_{:04X}", segno),
|
||||
RelishTag::FileNodeMap { spcnode, dbnode } => {
|
||||
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
|
||||
}
|
||||
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
|
||||
RelishTag::Checkpoint => format!("pg_control_checkpoint"),
|
||||
RelishTag::ControlFile => format!("pg_control"),
|
||||
};
|
||||
|
||||
format!(
|
||||
"{}_{}_{:016X}_{:016X}{}",
|
||||
basename,
|
||||
self.seg.segno,
|
||||
u64::from(self.start_lsn),
|
||||
u64::from(self.end_lsn),
|
||||
if self.dropped { "_DROPPED" } else { "" }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SnapshotFileName {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
pub struct ImageFileName {
|
||||
pub seg: SegmentTag,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
impl ImageFileName {
|
||||
fn from_str(fname: &str) -> Option<Self> {
|
||||
// Split the filename into parts
|
||||
//
|
||||
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>
|
||||
//
|
||||
// or if it was dropped:
|
||||
//
|
||||
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>_DROPPED
|
||||
//
|
||||
let rel;
|
||||
let mut parts;
|
||||
if let Some(rest) = fname.strip_prefix("rel_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Relation(RelTag {
|
||||
spcnode: parts.next()?.parse::<u32>().ok()?,
|
||||
dbnode: parts.next()?.parse::<u32>().ok()?,
|
||||
relnode: parts.next()?.parse::<u32>().ok()?,
|
||||
forknum: parts.next()?.parse::<u8>().ok()?,
|
||||
});
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_xact_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::Clog,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactMembers,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactOffsets,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::FileNodeMap {
|
||||
spcnode: parts.next()?.parse::<u32>().ok()?,
|
||||
dbnode: parts.next()?.parse::<u32>().ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_twophase_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::TwoPhase {
|
||||
xid: parts.next()?.parse::<u32>().ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Checkpoint;
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_control_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::ControlFile;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
|
||||
let segno = parts.next()?.parse::<u32>().ok()?;
|
||||
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno
|
||||
};
|
||||
|
||||
let lsn = Lsn::from_hex(parts.next()?).ok()?;
|
||||
|
||||
if parts.next().is_some() {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(ImageFileName {
|
||||
seg,
|
||||
lsn,
|
||||
})
|
||||
}
|
||||
|
||||
fn to_string(&self) -> String {
|
||||
let basename = match self.seg.rel {
|
||||
RelishTag::Relation(reltag) => format!(
|
||||
"rel_{}_{}_{}_{}",
|
||||
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
|
||||
),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::Clog,
|
||||
segno,
|
||||
} => format!("pg_xact_{:04X}", segno),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactMembers,
|
||||
segno,
|
||||
} => format!("pg_multixact_members_{:04X}", segno),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactOffsets,
|
||||
segno,
|
||||
} => format!("pg_multixact_offsets_{:04X}", segno),
|
||||
RelishTag::FileNodeMap { spcnode, dbnode } => {
|
||||
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
|
||||
}
|
||||
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
|
||||
RelishTag::Checkpoint => format!("pg_control_checkpoint"),
|
||||
RelishTag::ControlFile => format!("pg_control"),
|
||||
};
|
||||
|
||||
format!(
|
||||
"{}_{}_{:016X}",
|
||||
basename,
|
||||
self.seg.segno,
|
||||
u64::from(self.lsn),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ImageFileName {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Create SnapshotLayers representing all files on disk
|
||||
///
|
||||
// TODO: returning an Iterator would be more idiomatic
|
||||
pub fn list_snapshot_files(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<(Vec<SnapshotFileName>, Vec<ImageFileName>)> {
|
||||
let path = conf.timeline_path(&timelineid, &tenantid);
|
||||
|
||||
let mut snapfiles: Vec<SnapshotFileName> = Vec::new();
|
||||
let mut imgfiles: Vec<ImageFileName> = Vec::new();
|
||||
for direntry in fs::read_dir(path)? {
|
||||
let fname = direntry?.file_name();
|
||||
let fname = fname.to_str().unwrap();
|
||||
|
||||
if let Some(snapfilename) = SnapshotFileName::from_str(fname) {
|
||||
snapfiles.push(snapfilename);
|
||||
}
|
||||
|
||||
if let Some(imgfilename) = ImageFileName::from_str(fname) {
|
||||
imgfiles.push(imgfilename);
|
||||
}
|
||||
}
|
||||
return Ok((snapfiles, imgfiles));
|
||||
}
|
||||
@@ -1,384 +0,0 @@
|
||||
//! FIXME
|
||||
//! A SnapshotLayer represents one snapshot file on disk. One file holds all page
|
||||
//! version and size information of one relation, in a range of LSN.
|
||||
//! The name "snapshot file" is a bit of a misnomer because a snapshot file doesn't
|
||||
//! contain a snapshot at a specific LSN, but rather all the page versions in a range
|
||||
//! of LSNs.
|
||||
//!
|
||||
//! Currently, a snapshot file contains full information needed to reconstruct any
|
||||
//! page version in the LSN range, without consulting any other snapshot files. When
|
||||
//! a new snapshot file is created for writing, the full contents of relation are
|
||||
//! materialized as it is at the beginning of the LSN range. That can be very expensive,
|
||||
//! we should find a way to store differential files. But this keeps the read-side
|
||||
//! of things simple. You can find the correct snapshot file based on RelishTag and
|
||||
//! timeline+LSN, and once you've located it, you have all the data you need to in that
|
||||
//! file.
|
||||
//!
|
||||
//! When a snapshot file needs to be accessed, we slurp the whole file into memory, into
|
||||
//! the SnapshotLayer struct. See load() and unload() functions.
|
||||
//!
|
||||
//! On disk, the snapshot files are stored in timelines/<timelineid> directory.
|
||||
//! Currently, there are no subdirectories, and each snapshot file is named like this:
|
||||
//!
|
||||
//! <spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>
|
||||
//!
|
||||
//! For example:
|
||||
//!
|
||||
//! 1663_13990_2609_0_000000000169C348_000000000169C349
|
||||
//!
|
||||
//! If a relation is dropped, we add a '_DROPPED' to the end of the filename to indicate that.
|
||||
//! So the above example would become:
|
||||
//!
|
||||
//! 1663_13990_2609_0_000000000169C348_000000000169C349_DROPPED
|
||||
//!
|
||||
//! The end LSN indicates when it was dropped in that case, we don't store it in the
|
||||
//! file contents in any way.
|
||||
//!
|
||||
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
|
||||
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
|
||||
//! FIXME
|
||||
//!
|
||||
use crate::layered_repository::storage_layer::{Layer, PageReconstructData, SegmentTag};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::filename::{ImageFileName};
|
||||
use crate::layered_repository::RELISH_SEG_SIZE;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::Bytes;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Mutex, MutexGuard};
|
||||
|
||||
use bookfile::{Book, BookWriter};
|
||||
|
||||
use zenith_metrics::{register_histogram, Histogram};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
// Magic constant to identify a Zenith segment image file
|
||||
static IMAGE_FILE_MAGIC: u32 = 0x5A616E01 + 1;
|
||||
|
||||
static BASE_IMAGES_CHAPTER: u64 = 1;
|
||||
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
static ref RECONSTRUCT_TIME: Histogram = register_histogram!(
|
||||
"pageserver_image_reconstruct_time",
|
||||
"FIXME Time spent on storage operations"
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
///
|
||||
/// SnapshotLayer is the in-memory data structure associated with an
|
||||
/// on-disk snapshot file. We keep a SnapshotLayer in memory for each
|
||||
/// file, in the LayerMap. If a layer is in "loaded" state, we have a
|
||||
/// copy of the file in memory, in 'inner'. Otherwise the struct is
|
||||
/// just a placeholder for a file that exists on disk, and it needs to
|
||||
/// be loaded before using it in queries.
|
||||
///
|
||||
pub struct ImageLayer {
|
||||
conf: &'static PageServerConf,
|
||||
pub tenantid: ZTenantId,
|
||||
pub timelineid: ZTimelineId,
|
||||
pub seg: SegmentTag,
|
||||
|
||||
// This entry contains an image of all pages as of this LSN
|
||||
pub lsn: Lsn,
|
||||
|
||||
inner: Mutex<ImageLayerInner>,
|
||||
}
|
||||
|
||||
pub struct ImageLayerInner {
|
||||
/// If false, the 'page_versions' and 'relsizes' have not been
|
||||
/// loaded into memory yet.
|
||||
loaded: bool,
|
||||
|
||||
// indexed by block number (within segment)
|
||||
base_images: Vec<Bytes>,
|
||||
}
|
||||
|
||||
impl Layer for ImageLayer {
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
return self.timelineid;
|
||||
}
|
||||
|
||||
fn get_seg_tag(&self) -> SegmentTag {
|
||||
return self.seg;
|
||||
}
|
||||
|
||||
fn is_dropped(&self) -> bool {
|
||||
return false;
|
||||
}
|
||||
|
||||
fn get_start_lsn(&self) -> Lsn {
|
||||
return self.lsn;
|
||||
}
|
||||
|
||||
fn get_end_lsn(&self) -> Lsn {
|
||||
return self.lsn;
|
||||
}
|
||||
|
||||
/// Look up given page in the cache.
|
||||
fn get_page_reconstruct_data(
|
||||
&self,
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<Option<Lsn>> {
|
||||
let need_base_image_lsn: Option<Lsn>;
|
||||
|
||||
assert!(lsn >= self.lsn);
|
||||
|
||||
{
|
||||
let inner = self.load()?;
|
||||
|
||||
let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize;
|
||||
if let Some(img) = inner.base_images.get(base_blknum) {
|
||||
reconstruct_data.page_img = Some(img.clone());
|
||||
need_base_image_lsn = None;
|
||||
} else {
|
||||
bail!("no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn);
|
||||
}
|
||||
// release lock on 'inner'
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
fn get_seg_size(&self, _lsn: Lsn) -> Result<u32> {
|
||||
|
||||
let inner = self.load()?;
|
||||
let result = inner.base_images.len() as u32;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Does this segment exist at given LSN?
|
||||
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
|
||||
///
|
||||
/// Release most of the memory used by this layer. If it's accessed again later,
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
fn unload(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.base_images = Vec::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
fn path(&self) -> PathBuf {
|
||||
Self::path_for(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn path_for(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
fname: &ImageFileName,
|
||||
) -> PathBuf {
|
||||
conf.timeline_path(&timelineid, &tenantid)
|
||||
.join(fname.to_string())
|
||||
}
|
||||
|
||||
/// Create a new snapshot file, using the given btreemaps containing the page versions and
|
||||
/// relsizes.
|
||||
/// FIXME comment
|
||||
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
|
||||
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
|
||||
/// expedient.
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
seg: SegmentTag,
|
||||
lsn: Lsn,
|
||||
base_images: Vec<Bytes>,
|
||||
) -> Result<ImageLayer> {
|
||||
|
||||
let layer = ImageLayer {
|
||||
conf: conf,
|
||||
timelineid: timelineid,
|
||||
tenantid: tenantid,
|
||||
seg: seg,
|
||||
lsn: lsn,
|
||||
inner: Mutex::new(ImageLayerInner {
|
||||
loaded: true,
|
||||
base_images: base_images,
|
||||
}),
|
||||
};
|
||||
let inner = layer.inner.lock().unwrap();
|
||||
|
||||
// Write the images into a file
|
||||
let path = layer.path();
|
||||
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let file = File::create(&path)?;
|
||||
let book = BookWriter::new(file, IMAGE_FILE_MAGIC)?;
|
||||
|
||||
// Write out the base images
|
||||
let mut chapter = book.new_chapter(BASE_IMAGES_CHAPTER);
|
||||
let buf = Vec::ser(&inner.base_images)?;
|
||||
|
||||
chapter.write_all(&buf)?;
|
||||
let book = chapter.close()?;
|
||||
|
||||
book.close()?;
|
||||
|
||||
trace!("saved {}", &path.display());
|
||||
|
||||
drop(inner);
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
pub fn create_from_src(
|
||||
conf: &'static PageServerConf,
|
||||
timeline: &LayeredTimeline,
|
||||
src: &dyn Layer,
|
||||
lsn: Lsn,
|
||||
) -> Result<ImageLayer> {
|
||||
let seg = src.get_seg_tag();
|
||||
let timelineid = timeline.timelineid;
|
||||
|
||||
let startblk;
|
||||
let size;
|
||||
if seg.rel.is_blocky() {
|
||||
size = src.get_seg_size(lsn)?;
|
||||
startblk = seg.segno * RELISH_SEG_SIZE;
|
||||
} else {
|
||||
size = 1;
|
||||
startblk = 0;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"creating new ImageLayer for {} on timeline {} at {}",
|
||||
seg,
|
||||
timelineid,
|
||||
lsn,
|
||||
);
|
||||
|
||||
let mut base_images: Vec<Bytes> = Vec::new();
|
||||
for blknum in startblk..(startblk+size) {
|
||||
let img =
|
||||
RECONSTRUCT_TIME
|
||||
.observe_closure_duration(|| {
|
||||
timeline.materialize_page(seg, blknum, lsn, &*src)
|
||||
})?;
|
||||
|
||||
base_images.push(img);
|
||||
}
|
||||
|
||||
Self::create(conf, timelineid, timeline.tenantid, seg, lsn,
|
||||
base_images)
|
||||
}
|
||||
|
||||
|
||||
///
|
||||
/// Load the contents of the file into memory
|
||||
///
|
||||
fn load(&self) -> Result<MutexGuard<ImageLayerInner>> {
|
||||
// quick exit if already loaded
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
if inner.loaded {
|
||||
return Ok(inner);
|
||||
}
|
||||
|
||||
let path = Self::path_for(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
&ImageFileName {
|
||||
seg: self.seg,
|
||||
lsn: self.lsn,
|
||||
},
|
||||
);
|
||||
|
||||
let file = File::open(&path)?;
|
||||
let book = Book::new(file)?;
|
||||
|
||||
let chapter = book.read_chapter(BASE_IMAGES_CHAPTER)?;
|
||||
let base_images = Vec::des(&chapter)?;
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
|
||||
*inner = ImageLayerInner {
|
||||
loaded: true,
|
||||
base_images,
|
||||
};
|
||||
|
||||
Ok(inner)
|
||||
}
|
||||
|
||||
/// Create an ImageLayer represent a file on disk
|
||||
pub fn load_image_layer(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
filename: &ImageFileName,
|
||||
) -> Result<ImageLayer> {
|
||||
let layer = ImageLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg: filename.seg,
|
||||
lsn: filename.lsn,
|
||||
inner: Mutex::new(ImageLayerInner {
|
||||
loaded: false,
|
||||
base_images: Vec::new(),
|
||||
}),
|
||||
};
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
#[allow(unused)]
|
||||
pub fn dump(&self) -> String {
|
||||
let mut result = format!(
|
||||
"----- image layer for {} at {} ----\n",
|
||||
self.seg, self.lsn,
|
||||
);
|
||||
|
||||
//let inner = self.inner.lock().unwrap();
|
||||
|
||||
//for (k, v) in inner.page_versions.iter() {
|
||||
// result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some());
|
||||
//}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,8 @@ use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::{ImageLayer, SnapshotLayer};
|
||||
use crate::layered_repository::SnapshotLayer;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
@@ -31,13 +32,9 @@ pub struct InMemoryLayer {
|
||||
///
|
||||
start_lsn: Lsn,
|
||||
|
||||
oldest_pending_lsn: Lsn,
|
||||
|
||||
/// The above fields never change. The parts that do change are in 'inner',
|
||||
/// and protected by mutex.
|
||||
inner: Mutex<InMemoryLayerInner>,
|
||||
|
||||
img_layer: Option<Arc<dyn Layer>>,
|
||||
}
|
||||
|
||||
pub struct InMemoryLayerInner {
|
||||
@@ -54,11 +51,6 @@ pub struct InMemoryLayerInner {
|
||||
/// `segsizes` tracks the size of the segment at different points in time.
|
||||
///
|
||||
segsizes: BTreeMap<Lsn, u32>,
|
||||
|
||||
///
|
||||
/// Memory usage
|
||||
///
|
||||
mem_used: usize,
|
||||
}
|
||||
|
||||
impl Layer for InMemoryLayer {
|
||||
@@ -128,16 +120,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Use the base image, if needed
|
||||
if let Some(need_lsn) = need_base_image_lsn {
|
||||
if let Some(img_layer) = &self.img_layer {
|
||||
need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?;
|
||||
} else {
|
||||
bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn);
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
// release lock on 'page_versions'
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
@@ -145,23 +128,18 @@ impl Layer for InMemoryLayer {
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
|
||||
assert!(lsn >= self.start_lsn);
|
||||
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let mut iter = inner.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
||||
|
||||
let result;
|
||||
if let Some((_entry_lsn, entry)) = iter.next_back() {
|
||||
result = *entry;
|
||||
// Use the base image if needed
|
||||
} else if let Some(img_layer) = &self.img_layer {
|
||||
result = img_layer.get_seg_size(lsn)?;
|
||||
let result = *entry;
|
||||
drop(inner);
|
||||
trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result);
|
||||
Ok(result)
|
||||
} else {
|
||||
result = 0;
|
||||
bail!("No size found for {} at {} in memory", self.seg, lsn);
|
||||
}
|
||||
trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Does this segment exist at given LSN?
|
||||
@@ -178,31 +156,9 @@ impl Layer for InMemoryLayer {
|
||||
// Otherwise, it exists
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
|
||||
///
|
||||
/// Release most of the memory used by this layer. If it's accessed again later,
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
fn unload(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
|
||||
pub fn get_oldest_pending_lsn(&self) -> Lsn {
|
||||
self.oldest_pending_lsn
|
||||
}
|
||||
|
||||
///
|
||||
/// Create a new, empty, in-memory layer
|
||||
///
|
||||
@@ -212,7 +168,6 @@ impl InMemoryLayer {
|
||||
tenantid: ZTenantId,
|
||||
seg: SegmentTag,
|
||||
start_lsn: Lsn,
|
||||
oldest_pending_lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!(
|
||||
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
|
||||
@@ -227,14 +182,11 @@ impl InMemoryLayer {
|
||||
tenantid,
|
||||
seg,
|
||||
start_lsn,
|
||||
oldest_pending_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
segsizes: BTreeMap::new(),
|
||||
mem_used: 0,
|
||||
}),
|
||||
img_layer: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -276,9 +228,6 @@ impl InMemoryLayer {
|
||||
self.timelineid,
|
||||
lsn
|
||||
);
|
||||
|
||||
let mem_size = pv.get_mem_size();
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
||||
@@ -289,8 +238,6 @@ impl InMemoryLayer {
|
||||
"Page version of rel {} blk {} at {} already exists",
|
||||
self.seg.rel, blknum, lsn
|
||||
);
|
||||
} else {
|
||||
inner.mem_used += mem_size;
|
||||
}
|
||||
|
||||
// Also update the relation size, if this extended the relation.
|
||||
@@ -302,8 +249,6 @@ impl InMemoryLayer {
|
||||
let oldsize;
|
||||
if let Some((_entry_lsn, entry)) = iter.next_back() {
|
||||
oldsize = *entry;
|
||||
} else if let Some(img_layer) = &self.img_layer {
|
||||
oldsize = img_layer.get_seg_size(lsn)?;
|
||||
} else {
|
||||
oldsize = 0;
|
||||
//bail!("No old size found for {} at {}", self.tag, lsn);
|
||||
@@ -352,37 +297,56 @@ impl InMemoryLayer {
|
||||
/// Initialize a new InMemoryLayer for, by copying the state at the given
|
||||
/// point in time from given existing layer.
|
||||
///
|
||||
pub fn create_successor_layer(
|
||||
pub fn copy_snapshot(
|
||||
conf: &'static PageServerConf,
|
||||
src: Arc<dyn Layer>,
|
||||
timeline: &LayeredTimeline,
|
||||
src: &dyn Layer,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
start_lsn: Lsn,
|
||||
oldest_pending_lsn: Lsn,
|
||||
lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
let seg = src.get_seg_tag();
|
||||
|
||||
trace!(
|
||||
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
|
||||
seg,
|
||||
src.get_seg_tag(),
|
||||
timelineid,
|
||||
start_lsn,
|
||||
lsn
|
||||
);
|
||||
let mut page_versions = BTreeMap::new();
|
||||
let mut segsizes = BTreeMap::new();
|
||||
|
||||
let seg = src.get_seg_tag();
|
||||
|
||||
let startblk;
|
||||
let size;
|
||||
if seg.rel.is_blocky() {
|
||||
size = src.get_seg_size(lsn)?;
|
||||
segsizes.insert(lsn, size);
|
||||
startblk = seg.segno * RELISH_SEG_SIZE;
|
||||
} else {
|
||||
size = 1;
|
||||
startblk = 0;
|
||||
}
|
||||
|
||||
for blknum in startblk..(startblk + size) {
|
||||
let img = timeline.materialize_page(seg, blknum, lsn, src)?;
|
||||
let pv = PageVersion {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
};
|
||||
page_versions.insert((blknum, lsn), pv);
|
||||
}
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg,
|
||||
start_lsn,
|
||||
oldest_pending_lsn,
|
||||
seg: src.get_seg_tag(),
|
||||
start_lsn: lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
segsizes: BTreeMap::new(),
|
||||
mem_used: 0,
|
||||
page_versions: page_versions,
|
||||
segsizes: segsizes,
|
||||
}),
|
||||
img_layer: Some(src),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -402,7 +366,7 @@ impl InMemoryLayer {
|
||||
cutoff_lsn: Lsn,
|
||||
// This is needed just to call materialize_page()
|
||||
timeline: &LayeredTimeline,
|
||||
) -> Result<(Vec<Arc<dyn Layer>>, Option<Arc<InMemoryLayer>>)> {
|
||||
) -> Result<(Option<Arc<SnapshotLayer>>, Option<Arc<InMemoryLayer>>)> {
|
||||
info!(
|
||||
"freezing in memory layer for {} on timeline {} at {}",
|
||||
self.seg, self.timelineid, cutoff_lsn
|
||||
@@ -458,18 +422,6 @@ impl InMemoryLayer {
|
||||
// we can release the lock now.
|
||||
drop(inner);
|
||||
|
||||
let mut historics: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
|
||||
// write a new base image layer at the cutoff point
|
||||
let imgfile = ImageLayer::create_from_src(
|
||||
self.conf,
|
||||
timeline,
|
||||
self,
|
||||
end_lsn,
|
||||
)?;
|
||||
let imgfile_rc: Arc<dyn Layer> = Arc::new(imgfile);
|
||||
historics.push(Arc::clone(&imgfile_rc));
|
||||
|
||||
// Write the page versions before the cutoff to disk.
|
||||
let snapfile = SnapshotLayer::create(
|
||||
self.conf,
|
||||
@@ -479,38 +431,36 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
end_lsn,
|
||||
dropped,
|
||||
self.img_layer.clone(),
|
||||
before_page_versions,
|
||||
before_segsizes,
|
||||
)?;
|
||||
let snapfile_rc: Arc<dyn Layer> = Arc::new(snapfile);
|
||||
historics.push(snapfile_rc);
|
||||
|
||||
// If there were any "new" page versions, initialize a new in-memory layer to hold
|
||||
// them
|
||||
let new_open =
|
||||
if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
||||
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
||||
|
||||
let new_open = Self::create_successor_layer(
|
||||
self.conf,
|
||||
imgfile_rc,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
end_lsn,
|
||||
end_lsn,
|
||||
)?;
|
||||
let mut new_inner = new_open.inner.lock().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
new_inner.segsizes.append(&mut after_segsizes);
|
||||
drop(new_inner);
|
||||
let new_open = Self::copy_snapshot(
|
||||
self.conf,
|
||||
timeline,
|
||||
&snapfile,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
end_lsn,
|
||||
)?;
|
||||
let mut new_inner = new_open.inner.lock().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
new_inner.segsizes.append(&mut after_segsizes);
|
||||
drop(new_inner);
|
||||
|
||||
Some(Arc::new(new_open))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Some(Arc::new(new_open))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok((historics, new_open))
|
||||
let new_historic = Some(Arc::new(snapfile));
|
||||
|
||||
Ok((new_historic, new_open))
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
|
||||
@@ -3,79 +3,38 @@
|
||||
//!
|
||||
//! When the timeline is first accessed, the server lists of all snapshot files
|
||||
//! in the timelines/<timelineid> directory, and populates this map with
|
||||
//! SnapshotLayers corresponding to each file. When new WAL is received, FIXME
|
||||
//! SnapshotLayers corresponding to each file. When new WAL is received,
|
||||
//! we create InMemoryLayers to hold the incoming records. Now and then,
|
||||
//! in the checkpoint() function, the in-memory layers are frozen, forming
|
||||
//! new snapshot layers and corresponding files are written to disk.
|
||||
//!
|
||||
|
||||
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
|
||||
use crate::layered_repository::{InMemoryLayer};
|
||||
use crate::layered_repository::{InMemoryLayer, SnapshotLayer};
|
||||
use crate::relish::*;
|
||||
use anyhow::Result;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{BinaryHeap, BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::ops::Bound::Included;
|
||||
use std::cmp::Ordering;
|
||||
use std::sync::Arc;
|
||||
use zenith_metrics::{register_int_gauge, IntGauge};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
lazy_static! {
|
||||
static ref NUM_INMEMORY_LAYERS: IntGauge =
|
||||
register_int_gauge!("pageserver_inmemory_layers", "Number of layers in memory")
|
||||
.expect("failed to define a metric");
|
||||
|
||||
static ref NUM_ONDISK_LAYERS: IntGauge =
|
||||
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
|
||||
.expect("failed to define a metric");
|
||||
}
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline. The last layer that is
|
||||
/// LayerMap tracks what layers exist or a timeline. The last layer that is
|
||||
/// open for writes is always an InMemoryLayer, and is tracked separately
|
||||
/// because there can be only one for each segment. The older layers,
|
||||
/// stored on disk, are kept in a BTreeMap keyed by the layer's start LSN.
|
||||
///
|
||||
pub struct LayerMap {
|
||||
segs: HashMap<SegmentTag, SegEntry>,
|
||||
|
||||
// FIXME: explain this
|
||||
open_segs: BinaryHeap<OpenSegEntry>,
|
||||
}
|
||||
|
||||
struct SegEntry {
|
||||
pub open: Option<Arc<InMemoryLayer>>,
|
||||
pub historic: BTreeMap<Lsn, Arc<dyn Layer>>,
|
||||
pub historic: BTreeMap<Lsn, Arc<SnapshotLayer>>,
|
||||
}
|
||||
|
||||
struct OpenSegEntry {
|
||||
pub oldest_pending_lsn: Lsn,
|
||||
pub layer: Arc<InMemoryLayer>,
|
||||
}
|
||||
impl Ord for OpenSegEntry {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
|
||||
// to get that.
|
||||
other.oldest_pending_lsn.cmp(&self.oldest_pending_lsn)
|
||||
}
|
||||
}
|
||||
impl PartialOrd for OpenSegEntry {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
|
||||
// to get that.
|
||||
other.oldest_pending_lsn.partial_cmp(&self.oldest_pending_lsn)
|
||||
}
|
||||
}
|
||||
impl PartialEq for OpenSegEntry {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.oldest_pending_lsn.eq(&other.oldest_pending_lsn)
|
||||
}
|
||||
}
|
||||
impl Eq for OpenSegEntry {}
|
||||
|
||||
impl LayerMap {
|
||||
///
|
||||
/// Look up using the given segment tag and LSN. This differs from a plain
|
||||
@@ -129,38 +88,20 @@ impl LayerMap {
|
||||
if let Some(_old) = &segentry.open {
|
||||
// FIXME: shouldn't exist, but check
|
||||
}
|
||||
segentry.open = Some(Arc::clone(&layer));
|
||||
segentry.open = Some(layer);
|
||||
} else {
|
||||
let segentry = SegEntry {
|
||||
open: Some(Arc::clone(&layer)),
|
||||
open: Some(layer),
|
||||
historic: BTreeMap::new(),
|
||||
};
|
||||
self.segs.insert(tag, segentry);
|
||||
}
|
||||
|
||||
let opensegentry = OpenSegEntry {
|
||||
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
|
||||
layer: layer,
|
||||
};
|
||||
self.open_segs.push(opensegentry);
|
||||
|
||||
NUM_INMEMORY_LAYERS.inc();
|
||||
}
|
||||
|
||||
// replace given open layer with other layers.
|
||||
pub fn pop_oldest(&mut self) {
|
||||
let opensegentry = self.open_segs.pop().unwrap();
|
||||
let segtag = opensegentry.layer.get_seg_tag();
|
||||
|
||||
let mut segentry = self.segs.get_mut(&segtag).unwrap();
|
||||
segentry.open = None;
|
||||
NUM_INMEMORY_LAYERS.dec();
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert an on-disk layer
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
pub fn insert_historic(&mut self, layer: Arc<SnapshotLayer>) {
|
||||
let tag = layer.get_seg_tag();
|
||||
let start_lsn = layer.get_start_lsn();
|
||||
|
||||
@@ -176,7 +117,6 @@ impl LayerMap {
|
||||
};
|
||||
self.segs.insert(tag, segentry);
|
||||
}
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
}
|
||||
|
||||
///
|
||||
@@ -184,14 +124,13 @@ impl LayerMap {
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer: &dyn Layer) {
|
||||
pub fn remove_historic(&mut self, layer: &SnapshotLayer) {
|
||||
let tag = layer.get_seg_tag();
|
||||
let start_lsn = layer.get_start_lsn();
|
||||
|
||||
if let Some(segentry) = self.segs.get_mut(&tag) {
|
||||
segentry.historic.remove(&start_lsn);
|
||||
}
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
|
||||
pub fn list_rels(&self, spcnode: u32, dbnode: u32) -> Result<HashSet<RelTag>> {
|
||||
@@ -230,16 +169,14 @@ impl LayerMap {
|
||||
/// Is there a newer layer for given segment?
|
||||
pub fn newer_layer_exists(&self, seg: SegmentTag, lsn: Lsn) -> bool {
|
||||
if let Some(segentry) = self.segs.get(&seg) {
|
||||
// open layer is always incremental so it doesn't count
|
||||
if let Some(_open) = &segentry.open {
|
||||
return true;
|
||||
}
|
||||
|
||||
for (newer_lsn, layer) in segentry
|
||||
.historic
|
||||
.range((Included(lsn), Included(Lsn(u64::MAX))))
|
||||
{
|
||||
// FIXME: incremental layers don't count
|
||||
if layer.is_incremental() {
|
||||
continue;
|
||||
}
|
||||
if layer.get_end_lsn() > lsn {
|
||||
trace!(
|
||||
"found later layer for {}, {} {}-{}",
|
||||
@@ -259,11 +196,10 @@ impl LayerMap {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn get_oldest_open_layer(&mut self) -> Option<Arc<InMemoryLayer>> {
|
||||
if let Some(opensegentry) = self.open_segs.peek() {
|
||||
Some(Arc::clone(&opensegentry.layer))
|
||||
} else {
|
||||
None
|
||||
pub fn iter_open_layers(&mut self) -> OpenLayerIter {
|
||||
OpenLayerIter {
|
||||
last: None,
|
||||
segiter: self.segs.iter_mut(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,18 +215,53 @@ impl Default for LayerMap {
|
||||
fn default() -> Self {
|
||||
LayerMap {
|
||||
segs: HashMap::new(),
|
||||
open_segs: BinaryHeap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpenLayerIter<'a> {
|
||||
last: Option<&'a mut SegEntry>,
|
||||
|
||||
segiter: std::collections::hash_map::IterMut<'a, SegmentTag, SegEntry>,
|
||||
}
|
||||
|
||||
impl<'a> OpenLayerIter<'a> {
|
||||
pub fn replace(&mut self, replacement: Option<Arc<InMemoryLayer>>) {
|
||||
let segentry = self.last.as_mut().unwrap();
|
||||
segentry.open = replacement;
|
||||
}
|
||||
|
||||
pub fn insert_historic(&mut self, new_layer: Arc<SnapshotLayer>) {
|
||||
let start_lsn = new_layer.get_start_lsn();
|
||||
|
||||
let segentry = self.last.as_mut().unwrap();
|
||||
segentry.historic.insert(start_lsn, new_layer);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OpenLayerIter<'a> {
|
||||
type Item = Arc<InMemoryLayer>;
|
||||
|
||||
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
|
||||
while let Some((_seg, entry)) = self.segiter.next() {
|
||||
if let Some(open) = &entry.open {
|
||||
let op = Arc::clone(&open);
|
||||
self.last = Some(entry);
|
||||
return Some(op);
|
||||
}
|
||||
}
|
||||
self.last = None;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HistoricLayerIter<'a> {
|
||||
segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
|
||||
iter: Option<std::collections::btree_map::Iter<'a, Lsn, Arc<dyn Layer>>>,
|
||||
iter: Option<std::collections::btree_map::Iter<'a, Lsn, Arc<SnapshotLayer>>>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for HistoricLayerIter<'a> {
|
||||
type Item = Arc<dyn Layer>;
|
||||
type Item = Arc<SnapshotLayer>;
|
||||
|
||||
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
|
||||
loop {
|
||||
|
||||
@@ -36,17 +36,17 @@
|
||||
//!
|
||||
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
|
||||
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
|
||||
//! FIXME
|
||||
//!
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag,
|
||||
};
|
||||
use crate::layered_repository::filename::{SnapshotFileName};
|
||||
use crate::relish::*;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
use log::*;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
@@ -65,6 +65,145 @@ static SNAPSHOT_FILE_MAGIC: u32 = 0x5A616E01;
|
||||
static PAGE_VERSIONS_CHAPTER: u64 = 1;
|
||||
static REL_SIZES_CHAPTER: u64 = 2;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
struct SnapshotFileName {
|
||||
seg: SegmentTag,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
}
|
||||
|
||||
impl SnapshotFileName {
|
||||
fn from_str(fname: &str) -> Option<Self> {
|
||||
// Split the filename into parts
|
||||
//
|
||||
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>
|
||||
//
|
||||
// or if it was dropped:
|
||||
//
|
||||
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>_DROPPED
|
||||
//
|
||||
let rel;
|
||||
let mut parts;
|
||||
if let Some(rest) = fname.strip_prefix("rel_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Relation(RelTag {
|
||||
spcnode: parts.next()?.parse::<u32>().ok()?,
|
||||
dbnode: parts.next()?.parse::<u32>().ok()?,
|
||||
relnode: parts.next()?.parse::<u32>().ok()?,
|
||||
forknum: parts.next()?.parse::<u8>().ok()?,
|
||||
});
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_xact_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::Clog,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactMembers,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactOffsets,
|
||||
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::FileNodeMap {
|
||||
spcnode: parts.next()?.parse::<u32>().ok()?,
|
||||
dbnode: parts.next()?.parse::<u32>().ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_twophase_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::TwoPhase {
|
||||
xid: parts.next()?.parse::<u32>().ok()?,
|
||||
};
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::Checkpoint;
|
||||
} else if let Some(rest) = fname.strip_prefix("pg_control_") {
|
||||
parts = rest.split('_');
|
||||
rel = RelishTag::ControlFile;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
|
||||
let segno = parts.next()?.parse::<u32>().ok()?;
|
||||
|
||||
let seg = SegmentTag { rel, segno };
|
||||
|
||||
let start_lsn = Lsn::from_hex(parts.next()?).ok()?;
|
||||
let end_lsn = Lsn::from_hex(parts.next()?).ok()?;
|
||||
|
||||
let mut dropped = false;
|
||||
if let Some(suffix) = parts.next() {
|
||||
if suffix == "DROPPED" {
|
||||
dropped = true;
|
||||
} else {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
return None;
|
||||
}
|
||||
}
|
||||
if parts.next().is_some() {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(SnapshotFileName {
|
||||
seg,
|
||||
start_lsn,
|
||||
end_lsn,
|
||||
dropped,
|
||||
})
|
||||
}
|
||||
|
||||
fn to_string(&self) -> String {
|
||||
let basename = match self.seg.rel {
|
||||
RelishTag::Relation(reltag) => format!(
|
||||
"rel_{}_{}_{}_{}",
|
||||
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
|
||||
),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::Clog,
|
||||
segno,
|
||||
} => format!("pg_xact_{:04X}", segno),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactMembers,
|
||||
segno,
|
||||
} => format!("pg_multixact_members_{:04X}", segno),
|
||||
RelishTag::Slru {
|
||||
slru: SlruKind::MultiXactOffsets,
|
||||
segno,
|
||||
} => format!("pg_multixact_offsets_{:04X}", segno),
|
||||
RelishTag::FileNodeMap { spcnode, dbnode } => {
|
||||
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
|
||||
}
|
||||
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
|
||||
RelishTag::Checkpoint => format!("pg_control_checkpoint"),
|
||||
RelishTag::ControlFile => format!("pg_control"),
|
||||
};
|
||||
|
||||
format!(
|
||||
"{}_{}_{:016X}_{:016X}{}",
|
||||
basename,
|
||||
self.seg.segno,
|
||||
u64::from(self.start_lsn),
|
||||
u64::from(self.end_lsn),
|
||||
if self.dropped { "_DROPPED" } else { "" }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SnapshotFileName {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// SnapshotLayer is the in-memory data structure associated with an
|
||||
/// on-disk snapshot file. We keep a SnapshotLayer in memory for each
|
||||
@@ -88,8 +227,6 @@ pub struct SnapshotLayer {
|
||||
dropped: bool,
|
||||
|
||||
inner: Mutex<SnapshotLayerInner>,
|
||||
|
||||
img_layer: Option<Arc<dyn Layer>>,
|
||||
}
|
||||
|
||||
pub struct SnapshotLayerInner {
|
||||
@@ -162,15 +299,6 @@ impl Layer for SnapshotLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Use the base image, if needed
|
||||
if let Some(need_lsn) = need_base_image_lsn {
|
||||
if let Some(img_layer) = &self.img_layer {
|
||||
need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?;
|
||||
} else {
|
||||
bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn);
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
}
|
||||
|
||||
@@ -179,23 +307,26 @@ impl Layer for SnapshotLayer {
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
|
||||
|
||||
assert!(lsn >= self.start_lsn);
|
||||
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let inner = self.load()?;
|
||||
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
|
||||
|
||||
let result;
|
||||
if let Some((_entry_lsn, entry)) = iter.next_back() {
|
||||
result = *entry;
|
||||
// Use the base image if needed
|
||||
} else if let Some(img_layer) = &self.img_layer {
|
||||
result = img_layer.get_seg_size(lsn)?;
|
||||
let result = *entry;
|
||||
drop(inner);
|
||||
trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result);
|
||||
Ok(result)
|
||||
} else {
|
||||
result = 0;
|
||||
error!(
|
||||
"No size found for {} at {} in snapshot layer {} {}-{}",
|
||||
self.seg, lsn, self.seg, self.start_lsn, self.end_lsn
|
||||
);
|
||||
bail!(
|
||||
"No size found for {} at {} in snapshot layer",
|
||||
self.seg,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Does this segment exist at given LSN?
|
||||
@@ -208,28 +339,6 @@ impl Layer for SnapshotLayer {
|
||||
// Otherwise, it exists.
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
///
|
||||
/// Release most of the memory used by this layer. If it's accessed again later,
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
fn unload(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_versions = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl SnapshotLayer {
|
||||
@@ -271,11 +380,9 @@ impl SnapshotLayer {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
img_layer: Option<Arc<dyn Layer>>,
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
) -> Result<SnapshotLayer> {
|
||||
|
||||
let snapfile = SnapshotLayer {
|
||||
conf: conf,
|
||||
timelineid: timelineid,
|
||||
@@ -289,7 +396,6 @@ impl SnapshotLayer {
|
||||
page_versions: page_versions,
|
||||
relsizes: relsizes,
|
||||
}),
|
||||
img_layer,
|
||||
};
|
||||
let inner = snapfile.inner.lock().unwrap();
|
||||
|
||||
@@ -301,7 +407,7 @@ impl SnapshotLayer {
|
||||
let file = File::create(&path)?;
|
||||
let book = BookWriter::new(file, SNAPSHOT_FILE_MAGIC)?;
|
||||
|
||||
// Write out the other page versions
|
||||
// Write out page versions
|
||||
let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER);
|
||||
let buf = BTreeMap::ser(&inner.page_versions)?;
|
||||
chapter.write_all(&buf)?;
|
||||
@@ -368,30 +474,56 @@ impl SnapshotLayer {
|
||||
/// Create SnapshotLayers representing all files on disk
|
||||
///
|
||||
// TODO: returning an Iterator would be more idiomatic
|
||||
pub fn load_snapshot_layer(
|
||||
pub fn list_snapshot_files(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
filename: &SnapshotFileName,
|
||||
) -> Result<SnapshotLayer> {
|
||||
let snapfile = SnapshotLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg: filename.seg,
|
||||
start_lsn: filename.start_lsn,
|
||||
end_lsn: filename.end_lsn,
|
||||
dropped: filename.dropped,
|
||||
inner: Mutex::new(SnapshotLayerInner {
|
||||
loaded: false,
|
||||
page_versions: BTreeMap::new(),
|
||||
relsizes: BTreeMap::new(),
|
||||
}),
|
||||
// FIXME: This doesn't work across restarts.
|
||||
img_layer: None,
|
||||
};
|
||||
) -> Result<Vec<Arc<SnapshotLayer>>> {
|
||||
let path = conf.timeline_path(&timelineid, &tenantid);
|
||||
|
||||
Ok(snapfile)
|
||||
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||
for direntry in fs::read_dir(path)? {
|
||||
let fname = direntry?.file_name();
|
||||
let fname = fname.to_str().unwrap();
|
||||
|
||||
if let Some(snapfilename) = SnapshotFileName::from_str(fname) {
|
||||
let snapfile = SnapshotLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg: snapfilename.seg,
|
||||
start_lsn: snapfilename.start_lsn,
|
||||
end_lsn: snapfilename.end_lsn,
|
||||
dropped: snapfilename.dropped,
|
||||
inner: Mutex::new(SnapshotLayerInner {
|
||||
loaded: false,
|
||||
page_versions: BTreeMap::new(),
|
||||
relsizes: BTreeMap::new(),
|
||||
}),
|
||||
};
|
||||
|
||||
snapfiles.push(Arc::new(snapfile));
|
||||
}
|
||||
}
|
||||
return Ok(snapfiles);
|
||||
}
|
||||
|
||||
pub fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Release most of the memory used by this layer. If it's accessed again later,
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
pub fn unload(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_versions = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
|
||||
@@ -69,28 +69,6 @@ pub struct PageVersion {
|
||||
pub record: Option<WALRecord>,
|
||||
}
|
||||
|
||||
impl PageVersion {
|
||||
pub fn get_mem_size(&self) -> usize {
|
||||
let mut sz = 0;
|
||||
|
||||
// every page version has some fixed overhead.
|
||||
sz += 16;
|
||||
|
||||
if let Some(img) = &self.page_image {
|
||||
sz += img.len();
|
||||
}
|
||||
|
||||
if let Some(rec) = &self.record {
|
||||
sz += rec.rec.len();
|
||||
|
||||
// Some per-record overhead. Not very accurate, but close enough
|
||||
sz += 32;
|
||||
}
|
||||
|
||||
sz
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Data needed to reconstruct a page version
|
||||
///
|
||||
@@ -147,9 +125,4 @@ pub trait Layer: Send + Sync {
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<u32>;
|
||||
|
||||
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
|
||||
|
||||
fn is_incremental(&self) -> bool;
|
||||
|
||||
fn unload(&self) -> Result<()>;
|
||||
fn delete(&self) -> Result<()>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user