Compare commits

...

5 Commits

Author SHA1 Message Date
Heikki Linnakangas
ea56b4a36a WIP: global job queue 2021-10-29 14:00:41 +03:00
Heikki Linnakangas
7cf7215ce2 WIP: Use poll() 2021-10-28 16:23:51 +03:00
Heikki Linnakangas
23713eb44f WIP: cache vfds 2021-10-28 01:03:34 +03:00
Heikki Linnakangas
28675739de WIP 2021-10-27 19:00:40 +03:00
Heikki Linnakangas
ea90d102e2 Refactor 'zenith' CLI subcommand handling
Also fixes 'zenith safekeeper restart -m immediate'. The stop-mode was
previously ignored.
2021-10-27 12:01:21 +03:00
21 changed files with 1196 additions and 530 deletions

3
Cargo.lock generated
View File

@@ -208,8 +208,6 @@ dependencies = [
[[package]]
name = "bookfile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753"
dependencies = [
"aversion",
"byteorder",
@@ -1195,6 +1193,7 @@ dependencies = [
"hyper",
"lazy_static",
"log",
"nix",
"postgres",
"postgres-protocol",
"postgres-types",

View File

@@ -5,7 +5,7 @@ authors = ["Stas Kelvich <stas@zenith.tech>"]
edition = "2018"
[dependencies]
bookfile = "^0.3"
bookfile = { path = "../../bookfile" }
chrono = "0.4.19"
rand = "0.8.3"
regex = "1.4.5"
@@ -37,6 +37,7 @@ async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
nix = "0.23"
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -28,6 +28,7 @@ use daemonize::Daemonize;
use pageserver::{
branches, defaults::*, http, page_service, relish_storage, tenant_mgr, PageServerConf,
layered_repository,
RelishStorageConfig, RelishStorageKind, S3Config, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
@@ -549,6 +550,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?;
let global_job_thread = layered_repository::launch_global_job_thread(conf);
for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() {
match info.signal {
SIGQUIT => {
@@ -577,6 +580,12 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
.expect("thread panicked")
.expect("thread exited with an error");
}
// Shut down global job thread
global_job_thread
.join()
.expect("thread panicked");
info!("Pageserver shut down successfully completed");
exit(0);
}

View File

@@ -230,7 +230,7 @@ fn bootstrap_timeline(
timeline.writer().as_ref(),
lsn,
)?;
timeline.checkpoint()?;
timeline.checkpoint_forced()?;
println!(
"created initial timeline {} timeline.lsn {}",

View File

@@ -59,6 +59,7 @@ mod image_layer;
mod inmemory_layer;
mod interval_tree;
mod layer_map;
mod jobs;
mod page_versions;
mod storage_layer;
@@ -66,10 +67,12 @@ use delta_layer::DeltaLayer;
use image_layer::ImageLayer;
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
use layer_map::{LayerId, LayerMap};
use storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
};
use jobs::{GlobalJob, schedule_job};
pub use jobs::launch_global_job_thread;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
@@ -129,10 +132,17 @@ pub struct LayeredRepository {
/// Makes evey repo's timelines to backup their files to remote storage,
/// when they get frozen.
upload_relishes: bool,
is_gc_scheduled: Mutex<bool>,
}
/// Public interface
impl Repository for LayeredRepository {
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository {
self
}
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
@@ -206,19 +216,33 @@ impl Repository for LayeredRepository {
/// Public entry point to GC. All the logic is in the private
/// gc_iteration_internal function, this public facade just wraps it for
/// metrics collection.
fn gc_iteration(
fn gc_manual(
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
STORAGE_TIME
.with_label_values(&["gc"])
.with_label_values(&["gc_manual"])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
})
}
fn gc_scheduled(&self) -> Result<GcResult> {
let result = STORAGE_TIME
.with_label_values(&["gc_scheduled"])
.observe_closure_duration(|| {
self.gc_iteration_internal(None, self.conf.gc_horizon, false)
});
let mut guard = self.is_gc_scheduled.lock().unwrap();
*guard = false;
result
}
// Wait for all threads to complete and persist repository data before pageserver shutdown.
fn shutdown(&self) -> Result<()> {
trace!("LayeredRepository shutdown for tenant {}", self.tenantid);
@@ -228,7 +252,7 @@ impl Repository for LayeredRepository {
walreceiver::stop_wal_receiver(*timelineid);
// Wait for syncing data to disk
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint()?;
timeline.checkpoint_forced()?;
//TODO Wait for walredo process to shutdown too
}
@@ -306,6 +330,7 @@ impl LayeredRepository {
timelines: Mutex::new(HashMap::new()),
walredo_mgr,
upload_relishes,
is_gc_scheduled: Mutex::new(false),
}
}
@@ -355,44 +380,6 @@ impl LayeredRepository {
Ok(())
}
///
/// Launch the GC thread in given repository.
///
pub fn launch_gc_thread(
conf: &'static PageServerConf,
rc: Arc<LayeredRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.gc_loop(conf).expect("GC thread died");
})
.unwrap()
}
///
/// GC thread's main loop
///
fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
self.gc_iteration(None, conf.gc_horizon, false).unwrap();
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
while sleep_time > 0 && !tenant_mgr::shutdown_requested() {
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
info!("gc thread for tenant {} waking up", self.tenantid);
}
Ok(())
}
/// Save timeline metadata to file
fn save_metadata(
conf: &'static PageServerConf,
@@ -546,7 +533,7 @@ impl LayeredRepository {
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint()?;
timeline.checkpoint_forced()?;
info!("timeline {} checkpoint_before_gc done", timelineid);
}
@@ -689,10 +676,18 @@ pub struct LayeredTimeline {
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
is_checkpoint_scheduled: Mutex<bool>,
last_gc: Mutex<Option<Lsn>>,
}
/// Public interface functions
impl Timeline for LayeredTimeline {
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline {
self
}
fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
}
@@ -867,13 +862,25 @@ impl Timeline for LayeredTimeline {
/// Public entry point for checkpoint(). All the logic is in the private
/// checkpoint_internal function, this public facade just wraps it for
/// metrics collection.
fn checkpoint(&self) -> Result<()> {
fn checkpoint_forced(&self) -> Result<()> {
STORAGE_TIME
.with_label_values(&["checkpoint_force"])
.with_label_values(&["checkpoint_forced"])
//pass checkpoint_distance=0 to force checkpoint
.observe_closure_duration(|| self.checkpoint_internal(0, true))
}
fn checkpoint_scheduled(&self) -> Result<()> {
let result = STORAGE_TIME
.with_label_values(&["checkpoint_scheduled"])
.observe_closure_duration(|| self.checkpoint_internal(self.conf.checkpoint_distance, false));
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
*guard = false;
result
}
fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
}
@@ -977,6 +984,9 @@ impl LayeredTimeline {
upload_relishes,
write_lock: Mutex::new(()),
is_checkpoint_scheduled: Mutex::new(false),
last_gc: Mutex::new(None),
};
Ok(timeline)
}
@@ -1145,7 +1155,7 @@ impl LayeredTimeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<dyn Layer>> {
let mut layers = self.layers.lock().unwrap();
assert!(lsn.is_aligned());
@@ -1160,7 +1170,9 @@ impl LayeredTimeline {
// Do we have a layer open for writing already?
let layer;
if let Some(open_layer) = layers.get_open(&seg) {
if let Some(open_layer_arc) = layers.get_open(&seg) {
let open_layer = open_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
if open_layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future");
}
@@ -1185,7 +1197,7 @@ impl LayeredTimeline {
lsn,
)?;
} else {
return Ok(open_layer);
return Ok(open_layer_arc);
}
}
// No writeable layer for this relation. Create one.
@@ -1276,9 +1288,10 @@ impl LayeredTimeline {
// a lot of memory and/or aren't receiving much updates anymore.
let mut disk_consistent_lsn = last_record_lsn;
let mut created_historics = false;
let mut layer_uploads = Vec::new();
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
while let Some((oldest_layer_id, oldest_layer_arc, oldest_generation)) = layers.peek_oldest_open() {
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
if tenant_mgr::shutdown_requested() && !forced {
@@ -1307,62 +1320,26 @@ impl LayeredTimeline {
break;
}
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
oldest_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.pop_oldest_open();
layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self)?;
// Evict the layer
self.evict_layer(oldest_layer_id)?;
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
created_historics = true;
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(oldest_layer);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
}
// Call unload() on all frozen layers, to release memory.
// This shouldn't be much memory, as only metadata is slurped
// into memory.
for layer in layers.iter_historic_layers() {
for (_layer_id, layer) in layers.iter_historic_layers() {
layer.unload()?;
}
drop(layers);
drop(write_guard);
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
}
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
@@ -1408,6 +1385,131 @@ impl LayeredTimeline {
Ok(())
}
pub fn schedule_checkpoint_if_needed(&self) -> Result<()> {
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
if *guard == true {
return Ok(());
}
let RecordLsn {
last: last_record_lsn,
prev: _prev_record_lsn,
} = self.last_record_lsn.load();
let mut layers = self.layers.lock().unwrap();
if let Some((_oldest_layer_id, oldest_layer_arc, _oldest_generation)) = layers.peek_oldest_open() {
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
if distance > self.conf.checkpoint_distance.into() {
schedule_job(GlobalJob::CheckpointTimeline(self.tenantid, self.timelineid));
*guard = true;
}
}
Ok(())
}
pub fn schedule_gc_if_needed(&self) -> Result<()> {
let RecordLsn {
last: last_record_lsn,
prev: _prev_record_lsn,
} = self.last_record_lsn.load();
let gc_needed = {
let last_gc = self.last_gc.lock().unwrap();
if let Some(last_gc) = *last_gc {
let distance = last_record_lsn.widening_sub(last_gc);
if distance > std::cmp::max(10*1024*1024, self.conf.gc_horizon / 2) as i128 {
true
} else {
false
}
} else {
true
}
};
if !gc_needed {
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
let repo = repo.upgrade_to_layered_repository();
let mut gc_scheduled = repo.is_gc_scheduled.lock().unwrap();
if *gc_scheduled == true {
return Ok(());
}
schedule_job(GlobalJob::GarbageCollect(self.tenantid));
*gc_scheduled = true;
Ok(())
}
fn evict_layer(&self, layer_id: LayerId) -> Result<()> {
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
if let Some(victim_layer_arc) = layers.get_with_id(layer_id) {
if let Some(victim_layer) = victim_layer_arc.upgrade_to_inmemory_layer() {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
victim_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.remove(layer_id);
let frozen_layer_id = layers.insert_historic(victim_layer_arc.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = victim_layer.write_to_disk(self)?;
let created_historics = !new_historics.is_empty();
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove(frozen_layer_id);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
// FIXME layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
// FIXME layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// TODO: it's inefficient to do this after every eviction, if we're evicting
// a lot of layers.
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
}
drop(layers);
drop(write_guard);
}
}
Ok(())
}
///
/// Garbage collect layer files on a timeline that are no longer needed.
///
@@ -1440,7 +1542,7 @@ impl LayeredTimeline {
debug!("retain_lsns: {:?}", retain_lsns);
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
let mut layers_to_remove: Vec<LayerId> = Vec::new();
// Scan all on-disk layers in the timeline.
//
@@ -1451,7 +1553,7 @@ impl LayeredTimeline {
// 4. this layer doesn't serve as a tombstone for some older layer;
//
let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
'outer: for (layer_id, l) in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
if seg.rel.is_relation() {
@@ -1462,7 +1564,7 @@ impl LayeredTimeline {
// 1. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!(
trace!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
@@ -1481,7 +1583,7 @@ impl LayeredTimeline {
for retain_lsn in &retain_lsns {
// start_lsn is inclusive and end_lsn is exclusive
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
info!(
trace!(
"keeping {} {}-{} because it's needed by branch point {}",
seg,
l.get_start_lsn(),
@@ -1500,7 +1602,7 @@ impl LayeredTimeline {
// 3. Is there a later on-disk layer for this relation?
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
{
info!(
trace!(
"keeping {} {}-{} because it is the latest layer",
seg,
l.get_start_lsn(),
@@ -1569,7 +1671,7 @@ impl LayeredTimeline {
}
if is_tombstone {
info!(
trace!(
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg,
l.get_start_lsn(),
@@ -1593,27 +1695,32 @@ impl LayeredTimeline {
l.get_end_lsn(),
l.is_dropped()
);
layers_to_remove.push(Arc::clone(&l));
layers_to_remove.push(layer_id);
}
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
for doomed_layer in layers_to_remove {
doomed_layer.delete()?;
layers.remove_historic(doomed_layer.clone());
for doomed_layer_id in layers_to_remove {
if let Some(doomed_layer) = layers.get_with_id(doomed_layer_id) {
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
doomed_layer.delete()?;
layers.remove(doomed_layer_id);
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
}
}
}
let mut guard = self.last_gc.lock().unwrap();
*guard = Some(cutoff);
result.elapsed = now.elapsed();
Ok(result)
}
@@ -1806,9 +1913,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_wal_record(lsn, blknum, rec);
let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_wal_record(lsn, blknum, rec);
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())
}
@@ -1825,7 +1933,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_page_image(blknum, lsn, img);
let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_page_image(blknum, lsn, img);
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
@@ -1870,7 +1978,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
};
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
}
// Truncate the last remaining segment to the specified size
@@ -1880,7 +1988,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
segno: last_remain_seg,
};
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
layer.upgrade_to_inmemory_layer().unwrap().put_truncation(lsn, relsize % RELISH_SEG_SIZE)
}
self.tl
.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
@@ -1908,7 +2016,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
segno: remove_segno,
};
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
}
self.tl
.decrease_current_logical_size(oldsize * BLCKSZ as u32);
@@ -1922,7 +2030,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
// TODO handle TwoPhase relishes
let seg = SegmentTag::from_blknum(rel, 0);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
}
Ok(())

View File

@@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct BlobRange {
offset: u64,
size: usize,
pub offset: u64,
pub size: usize,
}
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {

View File

@@ -42,6 +42,7 @@ use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
};
use crate::vfd::VirtualFile;
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
@@ -145,9 +146,15 @@ pub struct DeltaLayerInner {
/// `relsizes` tracks the size of the relation at different points in time.
relsizes: VecMap<Lsn, u32>,
vfile: VirtualFile,
}
impl Layer for DeltaLayer {
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -186,9 +193,11 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
// TODO: avoid opening the file for each read
let (_path, book) = self.open_book()?;
let mut inner = self.load()?;
let file = inner.vfile.open()?;
let book = Book::new(file)?;
let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
let inner = self.load()?;
// Scan the metadata BTreeMap backwards, starting from the given entry.
let minkey = (blknum, Lsn(0));
@@ -221,6 +230,9 @@ impl Layer for DeltaLayer {
}
// release metadata lock and close the file
let file = book.close();
inner.vfile.cache(file);
}
// If an older page image is needed to reconstruct the page, let the
@@ -365,6 +377,18 @@ impl DeltaLayer {
assert!(!relsizes.is_empty());
}
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&DeltaFileName {
seg: seg,
start_lsn: start_lsn,
end_lsn: end_lsn,
dropped: dropped,
}
);
let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -377,6 +401,7 @@ impl DeltaLayer {
loaded: true,
page_version_metas: VecMap::default(),
relsizes,
vfile: VirtualFile::new(&path),
}),
};
let mut inner = delta_layer.inner.lock().unwrap();
@@ -496,11 +521,9 @@ impl DeltaLayer {
debug!("loaded from {}", &path.display());
*inner = DeltaLayerInner {
loaded: true,
page_version_metas,
relsizes,
};
inner.loaded = true;
inner.page_version_metas = page_version_metas;
inner.relsizes = relsizes;
Ok(inner)
}
@@ -512,6 +535,13 @@ impl DeltaLayer {
tenantid: ZTenantId,
filename: &DeltaFileName,
) -> DeltaLayer {
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&filename,
);
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -524,6 +554,7 @@ impl DeltaLayer {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
vfile: VirtualFile::new(&path),
}),
}
}
@@ -534,7 +565,7 @@ impl DeltaLayer {
pub fn new_for_path(path: &Path, book: &Book<File>) -> Result<Self> {
let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
let summary = Summary::des(&chapter)?;
Ok(DeltaLayer {
path_or_conf: PathOrConf::Path(path.to_path_buf()),
timelineid: summary.timelineid,
@@ -547,6 +578,7 @@ impl DeltaLayer {
loaded: false,
page_version_metas: VecMap::default(),
relsizes: VecMap::default(),
vfile: VirtualFile::new(path),
}),
})
}

View File

@@ -29,6 +29,7 @@ use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use crate::vfd::VirtualFile;
use anyhow::{anyhow, bail, ensure, Result};
use bytes::Bytes;
use log::*;
@@ -36,7 +37,7 @@ use serde::{Deserialize, Serialize};
use std::convert::TryInto;
use std::fs;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard};
@@ -110,6 +111,8 @@ pub struct ImageLayerInner {
/// Derived from filename and bookfile chapter metadata
image_type: ImageType,
vfile: VirtualFile,
}
impl Layer for ImageLayer {
@@ -117,6 +120,10 @@ impl Layer for ImageLayer {
PathBuf::from(self.layer_name().to_string())
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -147,11 +154,12 @@ impl Layer for ImageLayer {
) -> Result<PageReconstructResult> {
assert!(lsn >= self.lsn);
let inner = self.load()?;
let mut inner = self.load()?;
let base_blknum = blknum % RELISH_SEG_SIZE;
let (_path, book) = self.open_book()?;
let mut file = inner.vfile.open()?;
let mut book = Book::new(&mut file)?;
let buf = match &inner.image_type {
ImageType::Blocky { num_blocks } => {
@@ -162,17 +170,20 @@ impl Layer for ImageLayer {
let mut buf = vec![0u8; BLOCK_SIZE];
let offset = BLOCK_SIZE as u64 * base_blknum as u64;
let chapter = book.chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
chapter.read_exact_at(&mut buf, offset)?;
let mut chapter = book.exclusive_chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
chapter.seek(SeekFrom::Start(offset))?;
chapter.read_exact(&mut buf)?;
buf
}
ImageType::NonBlocky => {
ensure!(base_blknum == 0);
book.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec()
book.exclusive_read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec()
}
};
inner.vfile.cache(file);
reconstruct_data.page_img = Some(Bytes::from(buf));
Ok(PageReconstructResult::Complete)
}
@@ -266,6 +277,16 @@ impl ImageLayer {
ImageType::NonBlocky
};
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&ImageFileName {
seg: seg,
lsn: lsn,
}
);
let layer = ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -275,12 +296,12 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner {
loaded: true,
image_type: image_type.clone(),
vfile: VirtualFile::new(&path),
}),
};
let inner = layer.inner.lock().unwrap();
// Write the images into a file
let path = layer.path();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
@@ -374,7 +395,8 @@ impl ImageLayer {
return Ok(inner);
}
let (path, book) = self.open_book()?;
let book = Book::new(inner.vfile.open()?)?;
match &self.path_or_conf {
PathOrConf::Conf(_) => {
@@ -412,12 +434,10 @@ impl ImageLayer {
ImageType::NonBlocky
};
debug!("loaded from {}", &path.display());
debug!("loaded from {}", &self.path().display());
*inner = ImageLayerInner {
loaded: true,
image_type,
};
inner.loaded = true;
inner.image_type = image_type;
Ok(inner)
}
@@ -438,6 +458,14 @@ impl ImageLayer {
tenantid: ZTenantId,
filename: &ImageFileName,
) -> ImageLayer {
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
filename,
);
ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -447,6 +475,7 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner {
loaded: false,
image_type: ImageType::Blocky { num_blocks: 0 },
vfile: VirtualFile::new(&path),
}),
}
}
@@ -467,6 +496,7 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner {
loaded: false,
image_type: ImageType::Blocky { num_blocks: 0 },
vfile: VirtualFile::new(path),
}),
})
}

View File

@@ -90,6 +90,11 @@ impl InMemoryLayerInner {
}
impl Layer for InMemoryLayer {
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
Some(self)
}
// An in-memory layer doesn't really have a filename as it's not stored on disk,
// but we construct a filename as if it was a delta layer
fn filename(&self) -> PathBuf {
@@ -113,6 +118,10 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename))
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}

View File

@@ -41,23 +41,22 @@
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
pub struct IntervalTree<I: ?Sized>
pub struct IntervalTree<I>
where
I: IntervalItem,
{
points: BTreeMap<I::Key, Point<I>>,
}
struct Point<I: ?Sized> {
struct Point<I> {
/// All intervals that contain this point, in no particular order.
///
/// We assume that there aren't a lot of overlappingg intervals, so that this vector
/// never grows very large. If that assumption doesn't hold, we could keep this ordered
/// by the end bound, to speed up `search`. But as long as there are only a few elements,
/// a linear search is OK.
elements: Vec<Arc<I>>,
elements: Vec<I>,
}
/// Abstraction for an interval that can be stored in the tree
@@ -75,14 +74,14 @@ pub trait IntervalItem {
}
}
impl<I: ?Sized> IntervalTree<I>
impl<I> IntervalTree<I>
where
I: IntervalItem,
I: IntervalItem + PartialEq + Clone,
{
/// Return an element that contains 'key', or precedes it.
///
/// If there are multiple candidates, returns the one with the highest 'end' key.
pub fn search(&self, key: I::Key) -> Option<Arc<I>> {
pub fn search(&self, key: I::Key) -> Option<&I> {
// Find the greatest point that precedes or is equal to the search key. If there is
// none, returns None.
let (_, p) = self.points.range(..=key).next_back()?;
@@ -100,7 +99,7 @@ where
}
})
.unwrap();
Some(Arc::clone(highest_item))
Some(highest_item)
}
/// Iterate over all items with start bound >= 'key'
@@ -119,7 +118,7 @@ where
}
}
pub fn insert(&mut self, item: Arc<I>) {
pub fn insert(&mut self, item: &I) {
let start_key = item.start_key();
let end_key = item.end_key();
assert!(start_key < end_key);
@@ -133,18 +132,18 @@ where
found_start_point = true;
// It is an error to insert the same item to the tree twice.
assert!(
!point.elements.iter().any(|x| Arc::ptr_eq(x, &item)),
!point.elements.iter().any(|x| x == item),
"interval is already in the tree"
);
}
point.elements.push(Arc::clone(&item));
point.elements.push(item.clone());
}
if !found_start_point {
// Create a new Point for the starting point
// Look at the previous point, and copy over elements that overlap with this
// new point
let mut new_elements: Vec<Arc<I>> = Vec::new();
let mut new_elements: Vec<I> = Vec::new();
if let Some((_, prev_point)) = self.points.range(..start_key).next_back() {
let overlapping_prev_elements = prev_point
.elements
@@ -154,7 +153,7 @@ where
new_elements.extend(overlapping_prev_elements);
}
new_elements.push(item);
new_elements.push(item.clone());
let new_point = Point {
elements: new_elements,
@@ -163,7 +162,7 @@ where
}
}
pub fn remove(&mut self, item: &Arc<I>) {
pub fn remove(&mut self, item: &I) {
// range search points
let start_key = item.start_key();
let end_key = item.end_key();
@@ -176,7 +175,7 @@ where
found_start_point = true;
}
let len_before = point.elements.len();
point.elements.retain(|other| !Arc::ptr_eq(other, item));
point.elements.retain(|other| other != item);
let len_after = point.elements.len();
assert_eq!(len_after + 1, len_before);
if len_after == 0 {
@@ -191,19 +190,19 @@ where
}
}
pub struct IntervalIter<'a, I: ?Sized>
pub struct IntervalIter<'a, I>
where
I: IntervalItem,
{
point_iter: std::collections::btree_map::Range<'a, I::Key, Point<I>>,
elem_iter: Option<(I::Key, std::slice::Iter<'a, Arc<I>>)>,
elem_iter: Option<(I::Key, std::slice::Iter<'a, I>)>,
}
impl<'a, I> Iterator for IntervalIter<'a, I>
where
I: IntervalItem + ?Sized,
I: IntervalItem,
{
type Item = Arc<I>;
type Item = &'a I;
fn next(&mut self) -> Option<Self::Item> {
// Iterate over all elements in all the points in 'point_iter'. To avoid
@@ -214,7 +213,7 @@ where
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
for elem in elem_iter {
if elem.start_key() == *point_key {
return Some(Arc::clone(elem));
return Some(elem);
}
}
}
@@ -230,7 +229,7 @@ where
}
}
impl<I: ?Sized> Default for IntervalTree<I>
impl<I> Default for IntervalTree<I>
where
I: IntervalItem,
{
@@ -246,7 +245,7 @@ mod tests {
use super::*;
use std::fmt;
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
struct MockItem {
start_key: u32,
end_key: u32,
@@ -288,7 +287,7 @@ mod tests {
tree: &IntervalTree<MockItem>,
key: u32,
expected: &[&str],
) -> Option<Arc<MockItem>> {
) -> Option<MockItem> {
if let Some(v) = tree.search(key) {
let vstr = v.to_string();
@@ -299,7 +298,7 @@ mod tests {
key, v, expected,
);
Some(v)
Some(v.clone())
} else {
assert!(
expected.is_empty(),
@@ -331,12 +330,12 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Simple, non-overlapping ranges.
tree.insert(Arc::new(MockItem::new(10, 11)));
tree.insert(Arc::new(MockItem::new(11, 12)));
tree.insert(Arc::new(MockItem::new(12, 13)));
tree.insert(Arc::new(MockItem::new(18, 19)));
tree.insert(Arc::new(MockItem::new(17, 18)));
tree.insert(Arc::new(MockItem::new(15, 16)));
tree.insert(&MockItem::new(10, 11));
tree.insert(&MockItem::new(11, 12));
tree.insert(&MockItem::new(12, 13));
tree.insert(&MockItem::new(18, 19));
tree.insert(&MockItem::new(17, 18));
tree.insert(&MockItem::new(15, 16));
assert_search(&tree, 9, &[]);
assert_search(&tree, 10, &["10-11"]);
@@ -370,13 +369,13 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Overlapping items
tree.insert(Arc::new(MockItem::new(22, 24)));
tree.insert(Arc::new(MockItem::new(23, 25)));
let x24_26 = Arc::new(MockItem::new(24, 26));
tree.insert(Arc::clone(&x24_26));
let x26_28 = Arc::new(MockItem::new(26, 28));
tree.insert(Arc::clone(&x26_28));
tree.insert(Arc::new(MockItem::new(25, 27)));
tree.insert(&MockItem::new(22, 24));
tree.insert(&MockItem::new(23, 25));
let x24_26 = MockItem::new(24, 26);
tree.insert(&x24_26);
let x26_28 = MockItem::new(26, 28);
tree.insert(&x26_28);
tree.insert(&MockItem::new(25, 27));
assert_search(&tree, 22, &["22-24"]);
assert_search(&tree, 23, &["22-24", "23-25"]);
@@ -403,10 +402,10 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Items containing other items
tree.insert(Arc::new(MockItem::new(31, 39)));
tree.insert(Arc::new(MockItem::new(32, 34)));
tree.insert(Arc::new(MockItem::new(33, 35)));
tree.insert(Arc::new(MockItem::new(30, 40)));
tree.insert(&MockItem::new(31, 39));
tree.insert(&MockItem::new(32, 34));
tree.insert(&MockItem::new(33, 35));
tree.insert(&MockItem::new(30, 40));
assert_search(&tree, 30, &["30-40"]);
assert_search(&tree, 31, &["30-40", "31-39"]);
@@ -427,16 +426,16 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Duplicate keys
let item_a = Arc::new(MockItem::new_str(55, 56, "a"));
tree.insert(Arc::clone(&item_a));
let item_b = Arc::new(MockItem::new_str(55, 56, "b"));
tree.insert(Arc::clone(&item_b));
let item_c = Arc::new(MockItem::new_str(55, 56, "c"));
tree.insert(Arc::clone(&item_c));
let item_d = Arc::new(MockItem::new_str(54, 56, "d"));
tree.insert(Arc::clone(&item_d));
let item_e = Arc::new(MockItem::new_str(55, 57, "e"));
tree.insert(Arc::clone(&item_e));
let item_a = MockItem::new_str(55, 56, "a");
tree.insert(&item_a);
let item_b = MockItem::new_str(55, 56, "b");
tree.insert(&item_b);
let item_c = MockItem::new_str(55, 56, "c");
tree.insert(&item_c);
let item_d = MockItem::new_str(54, 56, "d");
tree.insert(&item_d);
let item_e = MockItem::new_str(55, 57, "e");
tree.insert(&item_e);
dump_tree(&tree);
@@ -461,8 +460,8 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Inserting the same item twice is not cool
let item = Arc::new(MockItem::new(1, 2));
tree.insert(Arc::clone(&item));
tree.insert(Arc::clone(&item)); // fails assertion
let item = MockItem::new(1, 2);
tree.insert(&item);
tree.insert(&item); // fails assertion
}
}

View File

@@ -0,0 +1,129 @@
use crate::tenant_mgr;
use crate::layered_repository::layer_map;
use crate::PageServerConf;
use anyhow::Result;
use lazy_static::lazy_static;
use tracing::*;
use std::collections::VecDeque;
use std::sync::Mutex;
use std::thread::JoinHandle;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
static ref JOB_QUEUE: Mutex<GlobalJobQueue> = Mutex::new(GlobalJobQueue::default());
}
#[derive(Default)]
struct GlobalJobQueue {
jobs: VecDeque<GlobalJob>,
}
pub enum GlobalJob {
// To release memory
EvictSomeLayer,
// To advance 'disk_consistent_lsn'
CheckpointTimeline(ZTenantId, ZTimelineId),
// To free up disk space
GarbageCollect(ZTenantId),
}
pub fn schedule_job(job: GlobalJob) {
let mut queue = JOB_QUEUE.lock().unwrap();
queue.jobs.push_back(job);
}
///
/// Launch the global job handler thread
///
/// TODO: This ought to be a pool of threads
///
pub fn launch_global_job_thread(conf: &'static PageServerConf) -> JoinHandle<()> {
std::thread::Builder::new()
.name("Global Job thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
global_job_loop(conf).expect("Global job thread died");
})
.unwrap()
}
pub fn global_job_loop(conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.checkpoint_period);
info!("global job thread waking up");
let mut queue = JOB_QUEUE.lock().unwrap();
while let Some(job) = queue.jobs.pop_front() {
drop(queue);
let result = match job {
GlobalJob::EvictSomeLayer => {
evict_layer()
},
GlobalJob::CheckpointTimeline(tenantid, timelineid) => {
checkpoint_timeline(tenantid, timelineid)
}
GlobalJob::GarbageCollect(tenantid) => {
gc_tenant(tenantid)
}
};
if let Err(err) = result {
error!("job ended in error: {:#}", err);
}
queue = JOB_QUEUE.lock().unwrap();
}
}
trace!("Checkpointer thread shut down");
Ok(())
}
// Freeze and write out an in-memory layer
fn evict_layer() -> Result<()>
{
// Pick a victim
while let Some(layer_id) = layer_map::find_victim() {
let victim_layer = match layer_map::get_layer(layer_id) {
Some(l) => l,
None => continue,
};
let tenantid = victim_layer.get_tenant_id();
let timelineid = victim_layer.get_timeline_id();
let _entered = info_span!("global evict", timeline = %timelineid, tenant = %tenantid)
.entered();
info!("evicting {}", victim_layer.filename().display());
drop(victim_layer);
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline.upgrade_to_layered_timeline().evict_layer(layer_id)?
}
info!("no more eviction needed");
Ok(())
}
fn checkpoint_timeline(tenantid: ZTenantId, timelineid: ZTimelineId) -> Result<()> {
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline.checkpoint_scheduled()
}
fn gc_tenant(tenantid: ZTenantId) -> Result<()> {
let tenant = tenant_mgr::get_repository_for_tenant(tenantid)?;
tenant.gc_scheduled()?;
Ok(())
}

View File

@@ -9,6 +9,23 @@
//! new image and delta layers and corresponding files are written to disk.
//!
//
// Global layer registry:
//
// Every layer is inserted into the global registry, and assigned an ID
//
// The global registry tracks memory usage and usage count for each layer
//
//
// In addition to that, there is a per-timeline LayerMap, used for lookups
//
//
use crate::layered_repository::{schedule_job, GlobalJob};
use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree};
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::layered_repository::InMemoryLayer;
@@ -17,7 +34,8 @@ use anyhow::Result;
use lazy_static::lazy_static;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tracing::*;
use zenith_metrics::{register_int_gauge, IntGauge};
use zenith_utils::lsn::Lsn;
@@ -28,6 +46,176 @@ lazy_static! {
static ref NUM_ONDISK_LAYERS: IntGauge =
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric");
// Global layer map
static ref LAYERS: Mutex<GlobalLayerMap> = Mutex::new(GlobalLayerMap::new());
}
const MAX_OPEN_LAYERS: usize = 10;
const MAX_LOADED_LAYERS: usize = 100;
struct GlobalLayerEntry {
tag: u64, // to fix ABA problem
layer: Option<Arc<dyn Layer>>,
usage_count: u32,
}
struct GlobalLayerMap {
open_layers: Vec<GlobalLayerEntry>,
clock_arm: u32,
num_open_layers: usize,
eviction_scheduled: bool,
historic_layers: Vec<GlobalLayerEntry>,
}
impl GlobalLayerMap {
pub fn new() -> GlobalLayerMap {
GlobalLayerMap {
open_layers: Vec::new(),
clock_arm: 0,
historic_layers: Vec::new(),
eviction_scheduled: false,
num_open_layers: 0,
}
}
pub fn get(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
let e = if layer_id.is_historic() {
let idx = (layer_id.index - 1) as usize;
&mut self.historic_layers[idx]
} else {
let idx = ((-layer_id.index) - 1) as usize;
&mut self.open_layers[idx]
};
if e.usage_count < 5 {
e.usage_count += 1;
}
e.layer.clone()
}
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
let index = -(self.open_layers.len() as isize + 1);
let entry = GlobalLayerEntry {
layer: Some(layer),
usage_count: 1,
tag: 1,
};
let tag = entry.tag;
self.open_layers.push(entry);
self.num_open_layers += 1;
NUM_INMEMORY_LAYERS.inc();
if !self.eviction_scheduled && self.num_open_layers >= MAX_OPEN_LAYERS {
info!("scheduling global eviction");
schedule_job(GlobalJob::EvictSomeLayer);
self.eviction_scheduled = true;
}
LayerId {
index,
tag,
}
}
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
let index = self.historic_layers.len() as isize + 1;
let entry = GlobalLayerEntry {
layer: Some(layer),
usage_count: 1,
tag: 1,
};
let tag = entry.tag;
self.historic_layers.push(entry);
NUM_ONDISK_LAYERS.inc();
LayerId {
index,
tag,
}
}
pub fn remove(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
let old_layer;
if layer_id.is_historic() {
let idx = (layer_id.index - 1) as usize;
old_layer = self.historic_layers[idx].layer.take();
if old_layer.is_some() {
NUM_ONDISK_LAYERS.dec();
}
} else {
let idx = ((-layer_id.index) - 1) as usize;
old_layer = self.open_layers[idx].layer.take();
if old_layer.is_some() {
NUM_INMEMORY_LAYERS.dec();
self.num_open_layers -= 1;
}
}
old_layer
}
pub fn find_victim(&mut self) -> Option<LayerId> {
// run the clock algorithm among all open layers
for _ in 0..self.open_layers.len() * 5 {
self.clock_arm += 1;
if self.clock_arm >= self.open_layers.len() as u32 {
self.clock_arm = 0;
}
let next = self.clock_arm as usize;
if self.open_layers[next].usage_count == 0 {
return Some(LayerId {
index: -((next + 1) as isize),
tag: self.open_layers[next].tag,
});
} else {
self.open_layers[next].usage_count -= 1;
}
}
None
}
}
pub fn find_victim() -> Option<LayerId> {
let mut l = LAYERS.lock().unwrap();
if l.num_open_layers >= MAX_OPEN_LAYERS {
if let Some(x) = l.find_victim() {
info!("found victim out of {} open layers", l.num_open_layers);
Some(x)
} else {
info!("no victim found at {} open layers", l.num_open_layers);
None
}
} else {
info!("no victim needed at {} open layers", l.num_open_layers);
l.eviction_scheduled = false;
None
}
}
pub fn get_layer(layer_id: LayerId) -> Option<Arc<dyn Layer>> {
LAYERS.lock().unwrap().get(layer_id)
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct LayerId {
index: isize,
tag: u64
}
impl LayerId {
pub fn is_historic(&self) -> bool {
self.index > 0
}
}
///
@@ -41,7 +229,7 @@ pub struct LayerMap {
/// All in-memory layers, ordered by 'oldest_pending_lsn' and generation
/// of each layer. This allows easy access to the in-memory layer that
/// contains the oldest WAL record.
open_layers: BinaryHeap<OpenLayerEntry>,
open_layers: BinaryHeap<OpenLayerHeapEntry>,
/// Generation number, used to distinguish newly inserted entries in the
/// binary heap from older entries during checkpoint.
@@ -61,23 +249,32 @@ impl LayerMap {
segentry.get(lsn)
}
pub fn get_with_id(&self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
// TODO: check that it belongs to this tenant+timeline
LAYERS.lock().unwrap().get(layer_id)
}
///
/// Get the open layer for given segment for writing. Or None if no open
/// layer exists.
///
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<dyn Layer>> {
let segentry = self.segs.get(tag)?;
segentry.open.as_ref().map(Arc::clone)
let (layer_id, _start_lsn) = segentry.open?;
LAYERS.lock().unwrap().get(layer_id)
}
///
/// Insert an open in-memory layer
///
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
let layer_id = LAYERS.lock().unwrap().insert_open(Arc::clone(&layer));
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.update_open(Arc::clone(&layer));
segentry.update_open(layer_id, layer.get_start_lsn());
let oldest_pending_lsn = layer.get_oldest_pending_lsn();
@@ -87,58 +284,52 @@ impl LayerMap {
assert!(oldest_pending_lsn.is_aligned());
// Also add it to the binary heap
let open_layer_entry = OpenLayerEntry {
let open_layer_entry = OpenLayerHeapEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer,
layer_id,
generation: self.current_generation,
};
self.open_layers.push(open_layer_entry);
NUM_INMEMORY_LAYERS.inc();
}
/// Remove the oldest in-memory layer
pub fn pop_oldest_open(&mut self) {
// Pop it from the binary heap
let oldest_entry = self.open_layers.pop().unwrap();
let segtag = oldest_entry.layer.get_seg_tag();
/// Remove a layer
pub fn remove(&mut self, layer_id: LayerId) {
if let Some(layer) = LAYERS.lock().unwrap().remove(layer_id) {
// Also remove it from the SegEntry of this segment
if layer_id.is_historic() {
let tag = layer.get_seg_tag();
// Also remove it from the SegEntry of this segment
let mut segentry = self.segs.get_mut(&segtag).unwrap();
if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) {
segentry.open = None;
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
assert!(!oldest_entry.layer.is_writeable());
assert!(oldest_entry.layer.is_dropped());
if let Some(segentry) = self.segs.get_mut(&tag) {
segentry.historic.remove(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
}
} else {
let segtag = layer.get_seg_tag();
let mut segentry = self.segs.get_mut(&segtag).unwrap();
if let Some(open) = segentry.open {
if open.0 == layer_id {
segentry.open = None;
}
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
//assert!(!layer.is_writeable());
//assert!(layer.is_dropped());
}
}
}
NUM_INMEMORY_LAYERS.dec();
}
///
/// Insert an on-disk layer
///
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
let layer_id = LAYERS.lock().unwrap().insert_historic(Arc::clone(&layer));
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.insert_historic(layer);
segentry.insert_historic(layer_id, layer);
NUM_ONDISK_LAYERS.inc();
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
let tag = layer.get_seg_tag();
if let Some(segentry) = self.segs.get_mut(&tag) {
segentry.historic.remove(&layer);
}
NUM_ONDISK_LAYERS.dec();
layer_id
}
// List relations along with a flag that marks if they exist at the given lsn.
@@ -199,10 +390,18 @@ impl LayerMap {
}
/// Return the oldest in-memory layer, along with its generation number.
pub fn peek_oldest_open(&self) -> Option<(Arc<InMemoryLayer>, u64)> {
self.open_layers
.peek()
.map(|oldest_entry| (Arc::clone(&oldest_entry.layer), oldest_entry.generation))
pub fn peek_oldest_open(&mut self) -> Option<(LayerId, Arc<dyn Layer>, u64)> {
while let Some(oldest_entry) = self.open_layers.peek() {
if let Some(layer) = LAYERS.lock().unwrap().get(oldest_entry.layer_id) {
return Some((oldest_entry.layer_id,
layer,
oldest_entry.generation));
} else {
self.open_layers.pop();
}
}
None
}
/// Increment the generation number used to stamp open in-memory layers. Layers
@@ -220,6 +419,7 @@ impl LayerMap {
}
}
/*
/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub fn dump(&self) -> Result<()> {
@@ -236,16 +436,39 @@ impl LayerMap {
println!("End dump LayerMap");
Ok(())
}
*/
}
impl IntervalItem for dyn Layer {
#[derive(Clone)]
struct HistoricLayerIntervalTreeEntry {
layer_id: LayerId,
start_lsn: Lsn,
end_lsn: Lsn,
}
impl HistoricLayerIntervalTreeEntry {
fn new(layer_id: LayerId, layer: Arc<dyn Layer>) -> HistoricLayerIntervalTreeEntry{
HistoricLayerIntervalTreeEntry {
layer_id,
start_lsn: layer.get_start_lsn(),
end_lsn: layer.get_end_lsn(),
}
}
}
impl PartialEq for HistoricLayerIntervalTreeEntry {
fn eq(&self, other: &Self) -> bool {
self.layer_id == other.layer_id
}
}
impl IntervalItem for HistoricLayerIntervalTreeEntry {
type Key = Lsn;
fn start_key(&self) -> Lsn {
self.get_start_lsn()
self.start_lsn
}
fn end_key(&self) -> Lsn {
self.get_end_lsn()
self.end_lsn
}
}
@@ -259,8 +482,8 @@ impl IntervalItem for dyn Layer {
/// IntervalTree.
#[derive(Default)]
struct SegEntry {
open: Option<Arc<InMemoryLayer>>,
historic: IntervalTree<dyn Layer>,
open: Option<(LayerId, Lsn)>,
historic: IntervalTree<HistoricLayerIntervalTreeEntry>,
}
impl SegEntry {
@@ -276,13 +499,18 @@ impl SegEntry {
pub fn get(&self, lsn: Lsn) -> Option<Arc<dyn Layer>> {
if let Some(open) = &self.open {
if open.get_start_lsn() <= lsn {
let x: Arc<dyn Layer> = Arc::clone(open) as _;
return Some(x);
if let Some(layer) = LAYERS.lock().unwrap().get(open.0) {
if layer.get_start_lsn() <= lsn {
return Some(layer);
}
}
}
self.historic.search(lsn)
if let Some(historic) = self.historic.search(lsn) {
Some(LAYERS.lock().unwrap().get(historic.layer_id).unwrap())
} else {
None
}
}
pub fn newer_image_layer_exists(&self, lsn: Lsn) -> bool {
@@ -291,21 +519,25 @@ impl SegEntry {
self.historic
.iter_newer(lsn)
.any(|layer| !layer.is_incremental())
.any(|e| {
let layer = LAYERS.lock().unwrap().get(e.layer_id).unwrap();
!layer.is_incremental()
}
)
}
// Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer,
// but only if it is not writeable anymore.
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) {
if let Some(prev_open) = &self.open {
assert!(!prev_open.is_writeable());
pub fn update_open(&mut self, layer_id: LayerId, start_lsn: Lsn) {
if let Some(_prev_open) = &self.open {
//assert!(!prev_open.is_writeable());
}
self.open = Some(layer);
self.open = Some((layer_id, start_lsn));
}
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
self.historic.insert(layer);
pub fn insert_historic(&mut self, layer_id: LayerId, layer: Arc<dyn Layer>) {
self.historic.insert(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
}
}
@@ -315,12 +547,12 @@ impl SegEntry {
/// The generation number associated with each entry can be used to distinguish
/// recently-added entries (i.e after last call to increment_generation()) from older
/// entries with the same 'oldest_pending_lsn'.
struct OpenLayerEntry {
struct OpenLayerHeapEntry {
pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
pub generation: u64,
pub layer: Arc<InMemoryLayer>,
pub layer_id: LayerId,
}
impl Ord for OpenLayerEntry {
impl Ord for OpenLayerHeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that. Entries with identical oldest_pending_lsn are ordered by generation
@@ -330,32 +562,33 @@ impl Ord for OpenLayerEntry {
.then_with(|| other.generation.cmp(&self.generation))
}
}
impl PartialOrd for OpenLayerEntry {
impl PartialOrd for OpenLayerHeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for OpenLayerEntry {
impl PartialEq for OpenLayerHeapEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for OpenLayerEntry {}
impl Eq for OpenLayerHeapEntry {}
/// Iterator returned by LayerMap::iter_historic_layers()
pub struct HistoricLayerIter<'a> {
seg_iter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
iter: Option<IntervalIter<'a, dyn Layer>>,
iter: Option<IntervalIter<'a, HistoricLayerIntervalTreeEntry>>,
}
impl<'a> Iterator for HistoricLayerIter<'a> {
type Item = Arc<dyn Layer>;
type Item = (LayerId, Arc<dyn Layer>);
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
loop {
if let Some(x) = &mut self.iter {
if let Some(x) = x.next() {
return Some(Arc::clone(&x));
let layer = LAYERS.lock().unwrap().get(x.layer_id).unwrap();
return Some((x.layer_id, layer));
}
}
if let Some((_tag, segentry)) = self.seg_iter.next() {
@@ -426,10 +659,10 @@ mod tests {
// A helper function (closure) to pop the next oldest open entry from the layer map,
// and assert that it is what we'd expect
let mut assert_pop_layer = |expected_segno: u32, expected_generation: u64| {
let (l, generation) = layers.peek_oldest_open().unwrap();
let (layer_id, l, generation) = layers.peek_oldest_open().unwrap();
assert!(l.get_seg_tag().segno == expected_segno);
assert!(generation == expected_generation);
layers.pop_oldest_open();
layers.remove(layer_id);
};
assert_pop_layer(0, gen1); // 0x100

View File

@@ -2,9 +2,10 @@
//! Common traits and structs for layers
//!
use crate::layered_repository::InMemoryLayer;
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::ZTimelineId;
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -104,6 +105,14 @@ pub enum PageReconstructResult {
/// in-memory and on-disk layers.
///
pub trait Layer: Send + Sync {
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
None
}
/// Identify the timeline this relish belongs to
fn get_tenant_id(&self) -> ZTenantId;
/// Identify the timeline this relish belongs to
fn get_timeline_id(&self) -> ZTimelineId;

View File

@@ -21,6 +21,7 @@ pub mod tenant_mgr;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
pub mod vfd;
pub mod defaults {
use const_format::formatcp;

View File

@@ -690,7 +690,7 @@ impl postgres_backend::Handler for PageServerHandler {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
let result = repo.gc_manual(Some(timelineid), gc_horizon, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layer_relfiles_total"),

View File

@@ -33,12 +33,16 @@ pub trait Repository: Send + Sync {
/// `checkpoint_before_gc` parameter is used to force compaction of storage before CG
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
fn gc_iteration(
fn gc_manual(
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
fn gc_scheduled(&self) -> Result<GcResult>;
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository;
}
///
@@ -144,7 +148,9 @@ pub trait Timeline: Send + Sync {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint(&self) -> Result<()>;
fn checkpoint_forced(&self) -> Result<()>;
fn checkpoint_scheduled(&self) -> Result<()>;
/// Retrieve current logical size of the timeline
///
@@ -155,6 +161,8 @@ pub trait Timeline: Send + Sync {
/// Does the same as get_current_logical_size but counted on demand.
/// Used in tests to ensure thet incremental and non incremental variants match.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline;
}
/// Various functions to mutate the timeline.
@@ -714,8 +722,8 @@ mod tests {
.contains(&TESTREL_A));
// Run checkpoint and garbage collection and check that it's still not visible
newtline.checkpoint()?;
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
newtline.checkpoint_forced()?;
repo.gc_manual(Some(NEW_TIMELINE_ID), 0, true)?;
assert!(!newtline
.list_rels(0, TESTDB, Lsn(0x40))?

View File

@@ -106,17 +106,6 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
true,
));
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
};
handles.insert(tenant_id, h);
let mut m = access_tenants();
let tenant = m.get_mut(&tenant_id).unwrap();
tenant.repo = Some(repo);

155
pageserver/src/vfd.rs Normal file
View File

@@ -0,0 +1,155 @@
use std::fs::File;
use std::io::Seek;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use lazy_static::lazy_static;
const INVALID_TAG: u64 = u64::MAX;
struct OpenFiles {
next: usize,
files: Vec<OpenFile>,
}
lazy_static! {
static ref OPEN_FILES: Mutex<OpenFiles> = Mutex::new(OpenFiles {
next: 0,
files: Vec::new(),
});
}
struct OpenFile {
tag: u64,
file: Option<File>,
}
pub struct VirtualFile {
vfd: usize,
tag: u64,
path: PathBuf,
}
impl VirtualFile {
pub fn new(path: &Path) -> VirtualFile {
VirtualFile {
vfd: 0,
tag: INVALID_TAG,
path: path.to_path_buf(),
}
}
pub fn open(&mut self) -> std::io::Result<File> {
let mut l = OPEN_FILES.lock().unwrap();
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
if let Some(mut file) = l.files[self.vfd].file.take() {
// return cached File
file.rewind()?;
return Ok(file);
}
}
File::open(&self.path)
}
pub fn cache(&mut self, file: File) {
let mut l = OPEN_FILES.lock().unwrap();
let next = if l.next >= l.files.len() {
if l.files.len() < 100 {
l.files.push(OpenFile {
tag: 0,
file: None
});
l.files.len() - 1
} else {
// wrap around
0
}
} else {
l.next
};
l.next = next + 1;
l.files[next].file.replace(file);
l.files[next].tag += 1;
self.vfd = next;
self.tag = l.files[next].tag;
drop(l);
}
}
impl Drop for VirtualFile {
fn drop(&mut self) {
// Close file if it's still open
if self.tag != INVALID_TAG {
let mut l = OPEN_FILES.lock().unwrap();
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
l.files[self.vfd].file.take();
}
}
}
}
#[cfg(test)]
mod tests {
use crate::PageServerConf;
use super::*;
use std::io::Read;
#[test]
fn test_vfd() -> anyhow::Result<()> {
let mut vfiles = Vec::new();
let test_dir = PageServerConf::test_repo_dir("test_vfd");
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir)?;
for i in 0..2000 {
let path = test_dir.join(format!("vfd_test{}", i));
let content = format!("foobar{}", i);
std::fs::write(&path, &content)?;
let vfile = VirtualFile::new(&path);
vfiles.push((vfile, path, content));
}
for i in 0..vfiles.len() {
let (ref mut vfile, _path, expected_content) = &mut vfiles[i];
let mut s = String::new();
let mut file = vfile.open()?;
file.read_to_string(&mut s)?;
assert!(&s == expected_content);
vfile.cache(file);
s.clear();
let (ref mut vfile, _path, expected_content) = &mut vfiles[0];
let mut file = vfile.open()?;
file.read_to_string(&mut s)?;
assert!(&s == expected_content);
vfile.cache(file);
}
Ok(())
}
}

View File

@@ -251,6 +251,9 @@ fn walreceiver_main(
last_rec_lsn = lsn;
}
timeline.upgrade_to_layered_timeline().schedule_checkpoint_if_needed()?;
timeline.upgrade_to_layered_timeline().schedule_gc_if_needed()?;
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {}", endlsn);
caught_up = true;

View File

@@ -28,19 +28,21 @@ use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::Error;
use std::path::PathBuf;
use std::process::{ChildStdin, ChildStdout, ChildStderr, Command};
use std::process::Stdio;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{ChildStdin, ChildStdout, Command};
use tokio::time::timeout;
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantId;
use std::os::unix::io::AsRawFd;
use nix::poll::*;
use crate::relish::*;
use crate::repository::WALRecord;
use crate::waldecoder::XlMultiXactCreate;
@@ -133,14 +135,14 @@ lazy_static! {
/// perform WAL replay. Only one thread can use the processs at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// records. FIXME we have a pool now
///
pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime,
process: Mutex<Option<PostgresRedoProcess>>,
processes: Vec<Mutex<Option<PostgresRedoProcess>>>,
next: AtomicUsize,
}
#[derive(Debug)]
@@ -210,21 +212,18 @@ impl WalRedoManager for PostgresRedoManager {
end_time = Instant::now();
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
} else {
let mut process_guard = self.process.lock().unwrap();
let process_no = self.next.fetch_add(1, Ordering::AcqRel) % self.processes.len();
let mut process_guard = self.processes[process_no].lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = self
.runtime
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
let p = PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
result = self
.runtime
.block_on(self.handle_apply_request_postgres(process, &request));
result = self.handle_apply_request_postgres(process, &request);
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
end_time = Instant::now();
@@ -240,27 +239,24 @@ impl PostgresRedoManager {
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut processes: Vec<Mutex<Option<PostgresRedoProcess>>> = Vec::new();
for _ in 1..10 {
processes.push(Mutex::new(None));
}
// The actual process is launched lazily, on first request.
PostgresRedoManager {
runtime,
tenantid,
conf,
process: Mutex::new(None),
processes,
next: AtomicUsize::new(0),
}
}
///
/// Process one request for WAL redo using wal-redo postgres
///
async fn handle_apply_request_postgres(
fn handle_apply_request_postgres(
&self,
process: &mut PostgresRedoProcess,
request: &WalRedoRequest,
@@ -278,14 +274,14 @@ impl PostgresRedoManager {
if let RelishTag::Relation(rel) = request.rel {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
apply_result = process.apply_wal_records(buf_tag, base_img, records);
let duration = start.elapsed();
debug!(
"postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}",
trace!(
"postgres applied {} WAL records in {} us to reconstruct page image at LSN {}",
nrecords,
duration.as_millis(),
duration.as_micros(),
lsn
);
@@ -471,20 +467,22 @@ impl PostgresRedoManager {
struct PostgresRedoProcess {
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
}
impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
async fn launch(
fn launch(
conf: &PageServerConf,
process_no: usize,
tenantid: &ZTenantId,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
let datadir = conf.tenant_path(tenantid).join(format!("wal-redo-datadir-{}", process_no));
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
@@ -501,7 +499,6 @@ impl PostgresRedoProcess {
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.output()
.await
.expect("failed to execute initdb");
if !initdb.status.success() {
@@ -538,102 +535,106 @@ impl PostgresRedoProcess {
datadir.display()
);
let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr");
let stdout = child.stdout.take().expect("failed to open child's stdout");
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
}
if res.unwrap() == 0 {
break;
}
error!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())
};
tokio::spawn(f_stderr);
Ok(PostgresRedoProcess { stdin, stdout })
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
Ok(PostgresRedoProcess { stdin, stdout, stderr })
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
async fn apply_wal_records(
fn apply_wal_records(
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> {
let stdout = &mut self.stdout;
// Buffer the writes to avoid a lot of small syscalls.
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
//
// 'f_stdin' handles writing the base image and WAL records to the child process.
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
// tokio runtime in the 'launch' function already, forwards the logging.
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
if let Some(img) = base_img {
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??;
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
let mut buf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut buf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut buf);
}
// Send WAL records.
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
build_apply_record_msg(*lsn, &rec.rec, &mut buf);
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
build_get_page_msg(tag, &mut buf);
// The input is now in 'buf'.
let mut nwrite = 0;
let mut resultbuf = Vec::new();
resultbuf.resize(8192, 0);
let mut nresult = 0;
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// Do a blind write first
let n = self.stdin.write(&buf[nwrite..])?;
nwrite += n;
while nresult < 8192 {
let nfds = if nwrite < buf.len() {
3
} else {
2
};
nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
//
// 'f_stdin' handles writing the base image and WAL records to the child process.
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
// tokio runtime in the 'launch' function already, forwards the logging.
if nwrite < buf.len() && !pollfds[2].revents().unwrap().is_empty() {
// stdin
let n = self.stdin.write(&buf[nwrite..])?;
nwrite += n;
}
if !pollfds[0].revents().unwrap().is_empty() {
// stdout
// Read back new page image
let n = self.stdout.read(&mut resultbuf[nresult..])?;
// Send WAL records.
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
stdin
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
.await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
nresult += n;
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
if !pollfds[1].revents().unwrap().is_empty() {
// stderr
let mut readbuf: [u8; 16384] = [0; 16384];
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
let n = self.stderr.read(&mut readbuf)?;
error!("wal-redo-postgres: {}", String::from_utf8_lossy(&readbuf[0..n]));
}
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
}
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
let res = tokio::try_join!(f_stdout, f_stdin)?;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
Ok(Bytes::from(Vec::from(resultbuf)))
}
}
@@ -641,62 +642,50 @@ impl PostgresRedoProcess {
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
// explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> {
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'B');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
//debug_assert!(buf.len() == 1 + len);
}
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> {
fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
assert!(base_img.len() == 8192);
let len = 4 + 1 + 4 * 4 + base_img.len();
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'P');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
buf.put(base_img);
debug_assert!(buf.len() == 1 + len);
buf
//debug_assert!(buf.len() - oldlen == 1 + len);
}
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
let len = 4 + 8 + rec.len();
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
buf.put_u8(b'A');
buf.put_u32(len as u32);
buf.put_u64(endlsn.0);
buf.put(rec);
debug_assert!(buf.len() == 1 + len);
buf
//debug_assert!(buf.len() - oldlen == 1 + len);
}
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> {
fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'G');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
//debug_assert!(buf.len() == 1 + len);
}

View File

@@ -203,96 +203,38 @@ fn main() -> Result<()> {
)
.get_matches();
// Create config file
if let ("init", Some(init_match)) = matches.subcommand() {
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
// load and parse the file
std::fs::read_to_string(std::path::Path::new(config_path))
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))?
} else {
// Built-in default config
default_conf()
let (sub_name, sub_args) = matches.subcommand();
let sub_args = sub_args.expect("no subcommand");
// Check for 'zenith init' command first.
let subcmd_result = if sub_name == "init" {
handle_init(sub_args)
} else {
// all other commands need an existing config
let env = match LocalEnv::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config: {}", e);
exit(1);
}
};
let mut env = LocalEnv::create_config(&toml_file)
.with_context(|| "Failed to create zenith configuration")?;
env.init()
.with_context(|| "Failed to initialize zenith repository")?;
match sub_name {
"tenant" => handle_tenant(sub_args, &env),
"branch" => handle_branch(sub_args, &env),
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
_ => bail!("unexpected subcommand {}", sub_name),
}
};
if let Err(e) = subcmd_result {
eprintln!("command failed: {}", e);
exit(1);
}
// all other commands would need config
let env = match LocalEnv::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config: {}", e);
exit(1);
}
};
match matches.subcommand() {
("init", Some(_sub_m)) => {
// The options were handled above already
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.init(
// default_tenantid was generated by the `env.init()` call above
Some(&env.default_tenantid.unwrap().to_string()),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);
}
}
("tenant", Some(args)) => {
if let Err(e) = handle_tenant(args, &env) {
eprintln!("tenant command failed: {}", e);
exit(1);
}
}
("branch", Some(sub_args)) => {
if let Err(e) = handle_branch(sub_args, &env) {
eprintln!("branch command failed: {}", e);
exit(1);
}
}
("start", Some(sub_match)) => {
if let Err(e) = handle_start_all(sub_match, &env) {
eprintln!("start command failed: {}", e);
exit(1);
}
}
("stop", Some(sub_match)) => {
if let Err(e) = handle_stop_all(sub_match, &env) {
eprintln!("stop command failed: {}", e);
exit(1);
}
}
("pageserver", Some(sub_match)) => {
if let Err(e) = handle_pageserver(sub_match, &env) {
eprintln!("pageserver command failed: {}", e);
exit(1);
}
}
("pg", Some(pg_match)) => {
if let Err(e) = handle_pg(pg_match, &env) {
eprintln!("pg operation failed: {:?}", e);
exit(1);
}
}
("safekeeper", Some(sub_match)) => {
if let Err(e) = handle_safekeeper(sub_match, &env) {
eprintln!("safekeeper command failed: {}", e);
exit(1);
}
}
_ => {}
};
Ok(())
}
@@ -438,6 +380,35 @@ fn get_tenantid(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<ZTe
}
}
fn handle_init(init_match: &ArgMatches) -> Result<()> {
// Create config file
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
// load and parse the file
std::fs::read_to_string(std::path::Path::new(config_path))
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))?
} else {
// Built-in default config
default_conf()
};
let mut env = LocalEnv::create_config(&toml_file)
.with_context(|| "Failed to create zenith configuration")?;
env.init()
.with_context(|| "Failed to initialize zenith repository")?;
// Call 'pageserver init'.
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.init(
// default_tenantid was generated by the `env.init()` call above
Some(&env.default_tenantid.unwrap().to_string()),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);
}
Ok(())
}
fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
match tenant_match.subcommand() {
@@ -486,10 +457,11 @@ fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let mut cplane = ComputeControlPlane::load(env.clone())?;
match pg_match.subcommand() {
("list", Some(list_match)) => {
let tenantid = get_tenantid(list_match, env)?;
// All subcommands take an optional --tenantid option
let tenantid = get_tenantid(pg_match, env)?;
match pg_match.subcommand() {
("list", Some(_list_match)) => {
let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| {
eprintln!("Failed to load branch info: {}", e);
HashMap::new()
@@ -520,7 +492,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
}
}
("create", Some(create_match)) => {
let tenantid = get_tenantid(create_match, env)?;
let node_name = create_match.value_of("node").unwrap_or("main");
let timeline_name = create_match.value_of("timeline").unwrap_or(node_name);
@@ -531,7 +502,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
cplane.new_node(tenantid, node_name, timeline_name, port)?;
}
("start", Some(start_match)) => {
let tenantid = get_tenantid(start_match, env)?;
let node_name = start_match.value_of("node").unwrap_or("main");
let timeline_name = start_match.value_of("timeline");
@@ -572,7 +542,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
}
}
("stop", Some(stop_match)) => {
let tenantid = get_tenantid(stop_match, env)?;
let node_name = stop_match.value_of("node").unwrap_or("main");
let destroy = stop_match.is_present("destroy");
@@ -636,26 +605,23 @@ fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNod
}
fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
("start", Some(sub_match)) => {
let node_name = sub_match
.value_of("node")
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let safekeeper = get_safekeeper(env, node_name)?;
let (sub_name, sub_args) = sub_match.subcommand();
let sub_args = sub_args.expect("no safekeeper subcommand");
// All the commands take an optional safekeeper name argument
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let safekeeper = get_safekeeper(env, node_name)?;
match sub_name {
"start" => {
if let Err(e) = safekeeper.start() {
eprintln!("safekeeper start failed: {}", e);
exit(1);
}
}
("stop", Some(sub_match)) => {
let node_name = sub_match
.value_of("node")
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let immediate = sub_match.value_of("stop-mode") == Some("immediate");
let safekeeper = get_safekeeper(env, node_name)?;
"stop" => {
let immediate = sub_args.value_of("stop-mode") == Some("immediate");
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
@@ -663,15 +629,10 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
}
("restart", Some(sub_match)) => {
let node_name = sub_match
.value_of("node")
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
"restart" => {
let immediate = sub_args.value_of("stop-mode") == Some("immediate");
let safekeeper = get_safekeeper(env, node_name)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = safekeeper.stop(false) {
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e);
exit(1);
}
@@ -682,7 +643,9 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
}
_ => {}
_ => {
bail!("Unexpected safekeeper subcommand '{}'", sub_name)
}
}
Ok(())
}