WIP: global job queue

This commit is contained in:
Heikki Linnakangas
2021-10-29 14:00:41 +03:00
parent 7cf7215ce2
commit ea56b4a36a
13 changed files with 559 additions and 204 deletions

View File

@@ -28,6 +28,7 @@ use daemonize::Daemonize;
use pageserver::{
branches, defaults::*, http, page_service, relish_storage, tenant_mgr, PageServerConf,
layered_repository,
RelishStorageConfig, RelishStorageKind, S3Config, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
@@ -549,6 +550,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?;
let global_job_thread = layered_repository::launch_global_job_thread(conf);
for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() {
match info.signal {
SIGQUIT => {
@@ -577,6 +580,12 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
.expect("thread panicked")
.expect("thread exited with an error");
}
// Shut down global job thread
global_job_thread
.join()
.expect("thread panicked");
info!("Pageserver shut down successfully completed");
exit(0);
}

View File

@@ -230,7 +230,7 @@ fn bootstrap_timeline(
timeline.writer().as_ref(),
lsn,
)?;
timeline.checkpoint()?;
timeline.checkpoint_forced()?;
println!(
"created initial timeline {} timeline.lsn {}",

View File

@@ -59,6 +59,7 @@ mod image_layer;
mod inmemory_layer;
mod interval_tree;
mod layer_map;
mod jobs;
mod page_versions;
mod storage_layer;
@@ -70,6 +71,8 @@ use layer_map::{LayerId, LayerMap};
use storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
};
use jobs::{GlobalJob, schedule_job};
pub use jobs::launch_global_job_thread;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
@@ -129,10 +132,17 @@ pub struct LayeredRepository {
/// Makes evey repo's timelines to backup their files to remote storage,
/// when they get frozen.
upload_relishes: bool,
is_gc_scheduled: Mutex<bool>,
}
/// Public interface
impl Repository for LayeredRepository {
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository {
self
}
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
@@ -206,19 +216,33 @@ impl Repository for LayeredRepository {
/// Public entry point to GC. All the logic is in the private
/// gc_iteration_internal function, this public facade just wraps it for
/// metrics collection.
fn gc_iteration(
fn gc_manual(
&self,
target_timelineid: Option<ZTimelineId>,
horizon: u64,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
STORAGE_TIME
.with_label_values(&["gc"])
.with_label_values(&["gc_manual"])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
})
}
fn gc_scheduled(&self) -> Result<GcResult> {
let result = STORAGE_TIME
.with_label_values(&["gc_scheduled"])
.observe_closure_duration(|| {
self.gc_iteration_internal(None, self.conf.gc_horizon, false)
});
let mut guard = self.is_gc_scheduled.lock().unwrap();
*guard = false;
result
}
// Wait for all threads to complete and persist repository data before pageserver shutdown.
fn shutdown(&self) -> Result<()> {
trace!("LayeredRepository shutdown for tenant {}", self.tenantid);
@@ -228,7 +252,7 @@ impl Repository for LayeredRepository {
walreceiver::stop_wal_receiver(*timelineid);
// Wait for syncing data to disk
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint()?;
timeline.checkpoint_forced()?;
//TODO Wait for walredo process to shutdown too
}
@@ -306,6 +330,7 @@ impl LayeredRepository {
timelines: Mutex::new(HashMap::new()),
walredo_mgr,
upload_relishes,
is_gc_scheduled: Mutex::new(false),
}
}
@@ -355,44 +380,6 @@ impl LayeredRepository {
Ok(())
}
///
/// Launch the GC thread in given repository.
///
pub fn launch_gc_thread(
conf: &'static PageServerConf,
rc: Arc<LayeredRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.gc_loop(conf).expect("GC thread died");
})
.unwrap()
}
///
/// GC thread's main loop
///
fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
self.gc_iteration(None, conf.gc_horizon, false).unwrap();
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
while sleep_time > 0 && !tenant_mgr::shutdown_requested() {
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
info!("gc thread for tenant {} waking up", self.tenantid);
}
Ok(())
}
/// Save timeline metadata to file
fn save_metadata(
conf: &'static PageServerConf,
@@ -546,7 +533,7 @@ impl LayeredRepository {
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint()?;
timeline.checkpoint_forced()?;
info!("timeline {} checkpoint_before_gc done", timelineid);
}
@@ -689,10 +676,18 @@ pub struct LayeredTimeline {
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
is_checkpoint_scheduled: Mutex<bool>,
last_gc: Mutex<Option<Lsn>>,
}
/// Public interface functions
impl Timeline for LayeredTimeline {
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline {
self
}
fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn
}
@@ -867,13 +862,25 @@ impl Timeline for LayeredTimeline {
/// Public entry point for checkpoint(). All the logic is in the private
/// checkpoint_internal function, this public facade just wraps it for
/// metrics collection.
fn checkpoint(&self) -> Result<()> {
fn checkpoint_forced(&self) -> Result<()> {
STORAGE_TIME
.with_label_values(&["checkpoint_force"])
.with_label_values(&["checkpoint_forced"])
//pass checkpoint_distance=0 to force checkpoint
.observe_closure_duration(|| self.checkpoint_internal(0, true))
}
fn checkpoint_scheduled(&self) -> Result<()> {
let result = STORAGE_TIME
.with_label_values(&["checkpoint_scheduled"])
.observe_closure_duration(|| self.checkpoint_internal(self.conf.checkpoint_distance, false));
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
*guard = false;
result
}
fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
}
@@ -977,6 +984,9 @@ impl LayeredTimeline {
upload_relishes,
write_lock: Mutex::new(()),
is_checkpoint_scheduled: Mutex::new(false),
last_gc: Mutex::new(None),
};
Ok(timeline)
}
@@ -1145,7 +1155,7 @@ impl LayeredTimeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<dyn Layer>> {
let mut layers = self.layers.lock().unwrap();
assert!(lsn.is_aligned());
@@ -1160,7 +1170,9 @@ impl LayeredTimeline {
// Do we have a layer open for writing already?
let layer;
if let Some(open_layer) = layers.get_open(&seg) {
if let Some(open_layer_arc) = layers.get_open(&seg) {
let open_layer = open_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
if open_layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future");
}
@@ -1185,7 +1197,7 @@ impl LayeredTimeline {
lsn,
)?;
} else {
return Ok(open_layer);
return Ok(open_layer_arc);
}
}
// No writeable layer for this relation. Create one.
@@ -1276,9 +1288,10 @@ impl LayeredTimeline {
// a lot of memory and/or aren't receiving much updates anymore.
let mut disk_consistent_lsn = last_record_lsn;
let mut created_historics = false;
let mut layer_uploads = Vec::new();
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
while let Some((oldest_layer_id, oldest_layer_arc, oldest_generation)) = layers.peek_oldest_open() {
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
if tenant_mgr::shutdown_requested() && !forced {
@@ -1307,42 +1320,14 @@ impl LayeredTimeline {
break;
}
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
oldest_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.remove(oldest_layer_id);
let oldest_layer_id = layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self)?;
// Evict the layer
self.evict_layer(oldest_layer_id)?;
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
created_historics = true;
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove(oldest_layer_id);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
}
// Call unload() on all frozen layers, to release memory.
@@ -1355,14 +1340,6 @@ impl LayeredTimeline {
drop(layers);
drop(write_guard);
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
}
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
@@ -1408,6 +1385,131 @@ impl LayeredTimeline {
Ok(())
}
pub fn schedule_checkpoint_if_needed(&self) -> Result<()> {
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
if *guard == true {
return Ok(());
}
let RecordLsn {
last: last_record_lsn,
prev: _prev_record_lsn,
} = self.last_record_lsn.load();
let mut layers = self.layers.lock().unwrap();
if let Some((_oldest_layer_id, oldest_layer_arc, _oldest_generation)) = layers.peek_oldest_open() {
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
if distance > self.conf.checkpoint_distance.into() {
schedule_job(GlobalJob::CheckpointTimeline(self.tenantid, self.timelineid));
*guard = true;
}
}
Ok(())
}
pub fn schedule_gc_if_needed(&self) -> Result<()> {
let RecordLsn {
last: last_record_lsn,
prev: _prev_record_lsn,
} = self.last_record_lsn.load();
let gc_needed = {
let last_gc = self.last_gc.lock().unwrap();
if let Some(last_gc) = *last_gc {
let distance = last_record_lsn.widening_sub(last_gc);
if distance > std::cmp::max(10*1024*1024, self.conf.gc_horizon / 2) as i128 {
true
} else {
false
}
} else {
true
}
};
if !gc_needed {
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
let repo = repo.upgrade_to_layered_repository();
let mut gc_scheduled = repo.is_gc_scheduled.lock().unwrap();
if *gc_scheduled == true {
return Ok(());
}
schedule_job(GlobalJob::GarbageCollect(self.tenantid));
*gc_scheduled = true;
Ok(())
}
fn evict_layer(&self, layer_id: LayerId) -> Result<()> {
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
if let Some(victim_layer_arc) = layers.get_with_id(layer_id) {
if let Some(victim_layer) = victim_layer_arc.upgrade_to_inmemory_layer() {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
victim_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.remove(layer_id);
let frozen_layer_id = layers.insert_historic(victim_layer_arc.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = victim_layer.write_to_disk(self)?;
let created_historics = !new_historics.is_empty();
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove(frozen_layer_id);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
// FIXME layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
// FIXME layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// TODO: it's inefficient to do this after every eviction, if we're evicting
// a lot of layers.
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
}
drop(layers);
drop(write_guard);
}
}
Ok(())
}
///
/// Garbage collect layer files on a timeline that are no longer needed.
///
@@ -1462,7 +1564,7 @@ impl LayeredTimeline {
// 1. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!(
trace!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
@@ -1481,7 +1583,7 @@ impl LayeredTimeline {
for retain_lsn in &retain_lsns {
// start_lsn is inclusive and end_lsn is exclusive
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
info!(
trace!(
"keeping {} {}-{} because it's needed by branch point {}",
seg,
l.get_start_lsn(),
@@ -1500,7 +1602,7 @@ impl LayeredTimeline {
// 3. Is there a later on-disk layer for this relation?
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
{
info!(
trace!(
"keeping {} {}-{} because it is the latest layer",
seg,
l.get_start_lsn(),
@@ -1569,7 +1671,7 @@ impl LayeredTimeline {
}
if is_tombstone {
info!(
trace!(
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg,
l.get_start_lsn(),
@@ -1600,20 +1702,25 @@ impl LayeredTimeline {
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
for doomed_layer_id in layers_to_remove {
let doomed_layer = layers.get_with_id(doomed_layer_id);
doomed_layer.delete()?;
layers.remove(doomed_layer_id);
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
if let Some(doomed_layer) = layers.get_with_id(doomed_layer_id) {
doomed_layer.delete()?;
layers.remove(doomed_layer_id);
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
}
}
}
let mut guard = self.last_gc.lock().unwrap();
*guard = Some(cutoff);
result.elapsed = now.elapsed();
Ok(result)
}
@@ -1806,9 +1913,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_wal_record(lsn, blknum, rec);
let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_wal_record(lsn, blknum, rec);
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())
}
@@ -1825,7 +1933,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_page_image(blknum, lsn, img);
let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_page_image(blknum, lsn, img);
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
@@ -1870,7 +1978,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
};
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
}
// Truncate the last remaining segment to the specified size
@@ -1880,7 +1988,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
segno: last_remain_seg,
};
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
layer.upgrade_to_inmemory_layer().unwrap().put_truncation(lsn, relsize % RELISH_SEG_SIZE)
}
self.tl
.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
@@ -1908,7 +2016,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
segno: remove_segno,
};
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
}
self.tl
.decrease_current_logical_size(oldsize * BLCKSZ as u32);
@@ -1922,7 +2030,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
// TODO handle TwoPhase relishes
let seg = SegmentTag::from_blknum(rel, 0);
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
}
Ok(())

View File

@@ -151,6 +151,10 @@ pub struct DeltaLayerInner {
}
impl Layer for DeltaLayer {
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}

View File

@@ -120,6 +120,10 @@ impl Layer for ImageLayer {
PathBuf::from(self.layer_name().to_string())
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}

View File

@@ -90,6 +90,11 @@ impl InMemoryLayerInner {
}
impl Layer for InMemoryLayer {
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
Some(self)
}
// An in-memory layer doesn't really have a filename as it's not stored on disk,
// but we construct a filename as if it was a delta layer
fn filename(&self) -> PathBuf {
@@ -113,6 +118,10 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename))
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}

View File

@@ -0,0 +1,129 @@
use crate::tenant_mgr;
use crate::layered_repository::layer_map;
use crate::PageServerConf;
use anyhow::Result;
use lazy_static::lazy_static;
use tracing::*;
use std::collections::VecDeque;
use std::sync::Mutex;
use std::thread::JoinHandle;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
static ref JOB_QUEUE: Mutex<GlobalJobQueue> = Mutex::new(GlobalJobQueue::default());
}
#[derive(Default)]
struct GlobalJobQueue {
jobs: VecDeque<GlobalJob>,
}
pub enum GlobalJob {
// To release memory
EvictSomeLayer,
// To advance 'disk_consistent_lsn'
CheckpointTimeline(ZTenantId, ZTimelineId),
// To free up disk space
GarbageCollect(ZTenantId),
}
pub fn schedule_job(job: GlobalJob) {
let mut queue = JOB_QUEUE.lock().unwrap();
queue.jobs.push_back(job);
}
///
/// Launch the global job handler thread
///
/// TODO: This ought to be a pool of threads
///
pub fn launch_global_job_thread(conf: &'static PageServerConf) -> JoinHandle<()> {
std::thread::Builder::new()
.name("Global Job thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
global_job_loop(conf).expect("Global job thread died");
})
.unwrap()
}
pub fn global_job_loop(conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.checkpoint_period);
info!("global job thread waking up");
let mut queue = JOB_QUEUE.lock().unwrap();
while let Some(job) = queue.jobs.pop_front() {
drop(queue);
let result = match job {
GlobalJob::EvictSomeLayer => {
evict_layer()
},
GlobalJob::CheckpointTimeline(tenantid, timelineid) => {
checkpoint_timeline(tenantid, timelineid)
}
GlobalJob::GarbageCollect(tenantid) => {
gc_tenant(tenantid)
}
};
if let Err(err) = result {
error!("job ended in error: {:#}", err);
}
queue = JOB_QUEUE.lock().unwrap();
}
}
trace!("Checkpointer thread shut down");
Ok(())
}
// Freeze and write out an in-memory layer
fn evict_layer() -> Result<()>
{
// Pick a victim
while let Some(layer_id) = layer_map::find_victim() {
let victim_layer = match layer_map::get_layer(layer_id) {
Some(l) => l,
None => continue,
};
let tenantid = victim_layer.get_tenant_id();
let timelineid = victim_layer.get_timeline_id();
let _entered = info_span!("global evict", timeline = %timelineid, tenant = %tenantid)
.entered();
info!("evicting {}", victim_layer.filename().display());
drop(victim_layer);
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline.upgrade_to_layered_timeline().evict_layer(layer_id)?
}
info!("no more eviction needed");
Ok(())
}
fn checkpoint_timeline(tenantid: ZTenantId, timelineid: ZTimelineId) -> Result<()> {
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline.checkpoint_scheduled()
}
fn gc_tenant(tenantid: ZTenantId) -> Result<()> {
let tenant = tenant_mgr::get_repository_for_tenant(tenantid)?;
tenant.gc_scheduled()?;
Ok(())
}

View File

@@ -25,6 +25,7 @@
use crate::layered_repository::{schedule_job, GlobalJob};
use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree};
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::layered_repository::InMemoryLayer;
@@ -34,6 +35,7 @@ use lazy_static::lazy_static;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::sync::{Arc, Mutex};
use tracing::*;
use zenith_metrics::{register_int_gauge, IntGauge};
use zenith_utils::lsn::Lsn;
@@ -49,86 +51,172 @@ lazy_static! {
static ref LAYERS: Mutex<GlobalLayerMap> = Mutex::new(GlobalLayerMap::new());
}
const MAX_LOADED_LAYERS: usize = 10;
const MAX_OPEN_LAYERS: usize = 10;
#[derive(Clone)]
enum GlobalLayerEntry {
InMemory(Arc<InMemoryLayer>),
Historic(Arc<dyn Layer>),
const MAX_LOADED_LAYERS: usize = 100;
struct GlobalLayerEntry {
tag: u64, // to fix ABA problem
layer: Option<Arc<dyn Layer>>,
usage_count: u32,
}
struct GlobalLayerMap {
layers: HashMap<LayerId, GlobalLayerEntry>,
last_id: u64,
open_layers: Vec<GlobalLayerEntry>,
clock_arm: u32,
// Layers currently loaded. We run a clock algorithm across these.
loaded_layers: Vec<LayerId>,
num_open_layers: usize,
eviction_scheduled: bool,
historic_layers: Vec<GlobalLayerEntry>,
}
impl GlobalLayerMap {
pub fn new() -> GlobalLayerMap {
GlobalLayerMap {
layers: HashMap::new(),
last_id: 0,
loaded_layers: Vec::new(),
open_layers: Vec::new(),
clock_arm: 0,
historic_layers: Vec::new(),
eviction_scheduled: false,
num_open_layers: 0,
}
}
pub fn get(&mut self, layer_id: LayerId) -> Arc<dyn Layer> {
match self.layers.get(&layer_id) {
Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(),
Some(GlobalLayerEntry::Historic(layer)) => layer.clone(),
None => panic!()
pub fn get(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
let e = if layer_id.is_historic() {
let idx = (layer_id.index - 1) as usize;
&mut self.historic_layers[idx]
} else {
let idx = ((-layer_id.index) - 1) as usize;
&mut self.open_layers[idx]
};
if e.usage_count < 5 {
e.usage_count += 1;
}
}
pub fn get_open(&mut self, layer_id: LayerId) -> Arc<InMemoryLayer> {
match self.layers.get(&layer_id) {
Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(),
Some(GlobalLayerEntry::Historic(_layer)) => panic!(),
None => panic!()
}
e.layer.clone()
}
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
let layer_id = LayerId(self.last_id);
self.last_id += 1;
let index = -(self.open_layers.len() as isize + 1);
self.layers.insert(layer_id, GlobalLayerEntry::InMemory(layer));
let entry = GlobalLayerEntry {
layer: Some(layer),
usage_count: 1,
tag: 1,
};
let tag = entry.tag;
self.open_layers.push(entry);
self.num_open_layers += 1;
layer_id
NUM_INMEMORY_LAYERS.inc();
if !self.eviction_scheduled && self.num_open_layers >= MAX_OPEN_LAYERS {
info!("scheduling global eviction");
schedule_job(GlobalJob::EvictSomeLayer);
self.eviction_scheduled = true;
}
LayerId {
index,
tag,
}
}
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
let layer_id = LayerId(self.last_id);
self.last_id += 1;
let index = self.historic_layers.len() as isize + 1;
self.layers.insert(layer_id, GlobalLayerEntry::Historic(layer));
let entry = GlobalLayerEntry {
layer: Some(layer),
usage_count: 1,
tag: 1,
};
let tag = entry.tag;
self.historic_layers.push(entry);
layer_id
NUM_ONDISK_LAYERS.inc();
LayerId {
index,
tag,
}
}
pub fn remove(&mut self, layer_id: LayerId) -> GlobalLayerEntry {
if let Some(entry) = self.layers.remove(&layer_id) {
let orig_entry = entry.clone();
match orig_entry {
GlobalLayerEntry::InMemory(_layer) => {
NUM_INMEMORY_LAYERS.dec();
},
GlobalLayerEntry::Historic(_layer) => {
NUM_ONDISK_LAYERS.dec();
}
pub fn remove(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
let old_layer;
if layer_id.is_historic() {
let idx = (layer_id.index - 1) as usize;
old_layer = self.historic_layers[idx].layer.take();
if old_layer.is_some() {
NUM_ONDISK_LAYERS.dec();
}
entry.clone()
} else {
panic!()
let idx = ((-layer_id.index) - 1) as usize;
old_layer = self.open_layers[idx].layer.take();
if old_layer.is_some() {
NUM_INMEMORY_LAYERS.dec();
self.num_open_layers -= 1;
}
}
old_layer
}
pub fn find_victim(&mut self) -> Option<LayerId> {
// run the clock algorithm among all open layers
for _ in 0..self.open_layers.len() * 5 {
self.clock_arm += 1;
if self.clock_arm >= self.open_layers.len() as u32 {
self.clock_arm = 0;
}
let next = self.clock_arm as usize;
if self.open_layers[next].usage_count == 0 {
return Some(LayerId {
index: -((next + 1) as isize),
tag: self.open_layers[next].tag,
});
} else {
self.open_layers[next].usage_count -= 1;
}
}
None
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct LayerId(u64);
pub fn find_victim() -> Option<LayerId> {
let mut l = LAYERS.lock().unwrap();
if l.num_open_layers >= MAX_OPEN_LAYERS {
if let Some(x) = l.find_victim() {
info!("found victim out of {} open layers", l.num_open_layers);
Some(x)
} else {
info!("no victim found at {} open layers", l.num_open_layers);
None
}
} else {
info!("no victim needed at {} open layers", l.num_open_layers);
l.eviction_scheduled = false;
None
}
}
pub fn get_layer(layer_id: LayerId) -> Option<Arc<dyn Layer>> {
LAYERS.lock().unwrap().get(layer_id)
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct LayerId {
index: isize,
tag: u64
}
impl LayerId {
pub fn is_historic(&self) -> bool {
self.index > 0
}
}
///
/// LayerMap tracks what layers exist on a timeline.
@@ -161,7 +249,7 @@ impl LayerMap {
segentry.get(lsn)
}
pub fn get_with_id(&self, layer_id: LayerId) -> Arc<dyn Layer> {
pub fn get_with_id(&self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
// TODO: check that it belongs to this tenant+timeline
LAYERS.lock().unwrap().get(layer_id)
}
@@ -170,15 +258,11 @@ impl LayerMap {
/// Get the open layer for given segment for writing. Or None if no open
/// layer exists.
///
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<dyn Layer>> {
let segentry = self.segs.get(tag)?;
if let Some((layer_id, _start_lsn)) = segentry.open {
Some(LAYERS.lock().unwrap().get_open(layer_id))
} else {
None
}
let (layer_id, _start_lsn) = segentry.open?;
LAYERS.lock().unwrap().get(layer_id)
}
///
@@ -206,24 +290,19 @@ impl LayerMap {
generation: self.current_generation,
};
self.open_layers.push(open_layer_entry);
NUM_INMEMORY_LAYERS.inc();
}
/// Remove the oldest in-memory layer
/// Remove a layer
pub fn remove(&mut self, layer_id: LayerId) {
let layer_entry = LAYERS.lock().unwrap().remove(layer_id);
// Also remove it from the SegEntry of this segment
match layer_entry {
GlobalLayerEntry::InMemory(layer) => {
if let Some(layer) = LAYERS.lock().unwrap().remove(layer_id) {
// Also remove it from the SegEntry of this segment
if layer_id.is_historic() {
let tag = layer.get_seg_tag();
if let Some(segentry) = self.segs.get_mut(&tag) {
segentry.historic.remove(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
}
}
GlobalLayerEntry::Historic(layer) => {
} else {
let segtag = layer.get_seg_tag();
let mut segentry = self.segs.get_mut(&segtag).unwrap();
if let Some(open) = segentry.open {
@@ -311,15 +390,18 @@ impl LayerMap {
}
/// Return the oldest in-memory layer, along with its generation number.
pub fn peek_oldest_open(&self) -> Option<(LayerId, Arc<InMemoryLayer>, u64)> {
pub fn peek_oldest_open(&mut self) -> Option<(LayerId, Arc<dyn Layer>, u64)> {
if let Some(oldest_entry) = self.open_layers.peek() {
Some((oldest_entry.layer_id,
LAYERS.lock().unwrap().get_open(oldest_entry.layer_id),
oldest_entry.generation))
} else {
None
while let Some(oldest_entry) = self.open_layers.peek() {
if let Some(layer) = LAYERS.lock().unwrap().get(oldest_entry.layer_id) {
return Some((oldest_entry.layer_id,
layer,
oldest_entry.generation));
} else {
self.open_layers.pop();
}
}
None
}
/// Increment the generation number used to stamp open in-memory layers. Layers
@@ -417,14 +499,15 @@ impl SegEntry {
pub fn get(&self, lsn: Lsn) -> Option<Arc<dyn Layer>> {
if let Some(open) = &self.open {
let layer = LAYERS.lock().unwrap().get(open.0);
if layer.get_start_lsn() <= lsn {
return Some(layer);
if let Some(layer) = LAYERS.lock().unwrap().get(open.0) {
if layer.get_start_lsn() <= lsn {
return Some(layer);
}
}
}
if let Some(historic) = self.historic.search(lsn) {
Some(LAYERS.lock().unwrap().get(historic.layer_id))
Some(LAYERS.lock().unwrap().get(historic.layer_id).unwrap())
} else {
None
}
@@ -437,7 +520,7 @@ impl SegEntry {
self.historic
.iter_newer(lsn)
.any(|e| {
let layer = LAYERS.lock().unwrap().get(e.layer_id);
let layer = LAYERS.lock().unwrap().get(e.layer_id).unwrap();
!layer.is_incremental()
}
)
@@ -504,7 +587,7 @@ impl<'a> Iterator for HistoricLayerIter<'a> {
loop {
if let Some(x) = &mut self.iter {
if let Some(x) = x.next() {
let layer = LAYERS.lock().unwrap().get(x.layer_id);
let layer = LAYERS.lock().unwrap().get(x.layer_id).unwrap();
return Some((x.layer_id, layer));
}
}

View File

@@ -2,9 +2,10 @@
//! Common traits and structs for layers
//!
use crate::layered_repository::InMemoryLayer;
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::ZTimelineId;
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -104,6 +105,14 @@ pub enum PageReconstructResult {
/// in-memory and on-disk layers.
///
pub trait Layer: Send + Sync {
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
None
}
/// Identify the timeline this relish belongs to
fn get_tenant_id(&self) -> ZTenantId;
/// Identify the timeline this relish belongs to
fn get_timeline_id(&self) -> ZTimelineId;

View File

@@ -690,7 +690,7 @@ impl postgres_backend::Handler for PageServerHandler {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
let result = repo.gc_manual(Some(timelineid), gc_horizon, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layer_relfiles_total"),

View File

@@ -33,12 +33,16 @@ pub trait Repository: Send + Sync {
/// `checkpoint_before_gc` parameter is used to force compaction of storage before CG
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
fn gc_iteration(
fn gc_manual(
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
fn gc_scheduled(&self) -> Result<GcResult>;
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository;
}
///
@@ -144,7 +148,9 @@ pub trait Timeline: Send + Sync {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint(&self) -> Result<()>;
fn checkpoint_forced(&self) -> Result<()>;
fn checkpoint_scheduled(&self) -> Result<()>;
/// Retrieve current logical size of the timeline
///
@@ -155,6 +161,8 @@ pub trait Timeline: Send + Sync {
/// Does the same as get_current_logical_size but counted on demand.
/// Used in tests to ensure thet incremental and non incremental variants match.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline;
}
/// Various functions to mutate the timeline.
@@ -714,8 +722,8 @@ mod tests {
.contains(&TESTREL_A));
// Run checkpoint and garbage collection and check that it's still not visible
newtline.checkpoint()?;
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
newtline.checkpoint_forced()?;
repo.gc_manual(Some(NEW_TIMELINE_ID), 0, true)?;
assert!(!newtline
.list_rels(0, TESTDB, Lsn(0x40))?

View File

@@ -106,17 +106,6 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
true,
));
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
};
handles.insert(tenant_id, h);
let mut m = access_tenants();
let tenant = m.get_mut(&tenant_id).unwrap();
tenant.repo = Some(repo);

View File

@@ -251,6 +251,9 @@ fn walreceiver_main(
last_rec_lsn = lsn;
}
timeline.upgrade_to_layered_timeline().schedule_checkpoint_if_needed()?;
timeline.upgrade_to_layered_timeline().schedule_gc_if_needed()?;
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {}", endlsn);
caught_up = true;