From ea56b4a36a5465b55e234e75d4600346d3706684 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 29 Oct 2021 14:00:41 +0300 Subject: [PATCH] WIP: global job queue --- pageserver/src/bin/pageserver.rs | 9 + pageserver/src/branches.rs | 2 +- pageserver/src/layered_repository.rs | 324 ++++++++++++------ .../src/layered_repository/delta_layer.rs | 4 + .../src/layered_repository/image_layer.rs | 4 + .../src/layered_repository/inmemory_layer.rs | 9 + pageserver/src/layered_repository/jobs.rs | 129 +++++++ .../src/layered_repository/layer_map.rs | 239 ++++++++----- .../src/layered_repository/storage_layer.rs | 11 +- pageserver/src/page_service.rs | 2 +- pageserver/src/repository.rs | 16 +- pageserver/src/tenant_mgr.rs | 11 - pageserver/src/walreceiver.rs | 3 + 13 files changed, 559 insertions(+), 204 deletions(-) create mode 100644 pageserver/src/layered_repository/jobs.rs diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 3a577476dc..9ae1e63996 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -28,6 +28,7 @@ use daemonize::Daemonize; use pageserver::{ branches, defaults::*, http, page_service, relish_storage, tenant_mgr, PageServerConf, + layered_repository, RelishStorageConfig, RelishStorageKind, S3Config, LOG_FILE_NAME, }; use zenith_utils::http::endpoint; @@ -549,6 +550,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) })?; + let global_job_thread = layered_repository::launch_global_job_thread(conf); + for info in SignalsInfo::::new(TERM_SIGNALS)?.into_iter() { match info.signal { SIGQUIT => { @@ -577,6 +580,12 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { .expect("thread panicked") .expect("thread exited with an error"); } + + // Shut down global job thread + global_job_thread + .join() + .expect("thread panicked"); + info!("Pageserver shut down successfully completed"); exit(0); } diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index ba922acdb7..a16c361083 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -230,7 +230,7 @@ fn bootstrap_timeline( timeline.writer().as_ref(), lsn, )?; - timeline.checkpoint()?; + timeline.checkpoint_forced()?; println!( "created initial timeline {} timeline.lsn {}", diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index fd4131b5d6..c9a370d3d3 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -59,6 +59,7 @@ mod image_layer; mod inmemory_layer; mod interval_tree; mod layer_map; +mod jobs; mod page_versions; mod storage_layer; @@ -70,6 +71,8 @@ use layer_map::{LayerId, LayerMap}; use storage_layer::{ Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE, }; +use jobs::{GlobalJob, schedule_job}; +pub use jobs::launch_global_job_thread; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); @@ -129,10 +132,17 @@ pub struct LayeredRepository { /// Makes evey repo's timelines to backup their files to remote storage, /// when they get frozen. upload_relishes: bool, + + is_gc_scheduled: Mutex, } /// Public interface impl Repository for LayeredRepository { + + fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository { + self + } + fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { let mut timelines = self.timelines.lock().unwrap(); @@ -206,19 +216,33 @@ impl Repository for LayeredRepository { /// Public entry point to GC. All the logic is in the private /// gc_iteration_internal function, this public facade just wraps it for /// metrics collection. - fn gc_iteration( + fn gc_manual( &self, target_timelineid: Option, horizon: u64, checkpoint_before_gc: bool, ) -> Result { STORAGE_TIME - .with_label_values(&["gc"]) + .with_label_values(&["gc_manual"]) .observe_closure_duration(|| { self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc) }) } + fn gc_scheduled(&self) -> Result { + let result = STORAGE_TIME + .with_label_values(&["gc_scheduled"]) + .observe_closure_duration(|| { + self.gc_iteration_internal(None, self.conf.gc_horizon, false) + }); + + let mut guard = self.is_gc_scheduled.lock().unwrap(); + + *guard = false; + + result + } + // Wait for all threads to complete and persist repository data before pageserver shutdown. fn shutdown(&self) -> Result<()> { trace!("LayeredRepository shutdown for tenant {}", self.tenantid); @@ -228,7 +252,7 @@ impl Repository for LayeredRepository { walreceiver::stop_wal_receiver(*timelineid); // Wait for syncing data to disk trace!("repo shutdown. checkpoint timeline {}", timelineid); - timeline.checkpoint()?; + timeline.checkpoint_forced()?; //TODO Wait for walredo process to shutdown too } @@ -306,6 +330,7 @@ impl LayeredRepository { timelines: Mutex::new(HashMap::new()), walredo_mgr, upload_relishes, + is_gc_scheduled: Mutex::new(false), } } @@ -355,44 +380,6 @@ impl LayeredRepository { Ok(()) } - /// - /// Launch the GC thread in given repository. - /// - pub fn launch_gc_thread( - conf: &'static PageServerConf, - rc: Arc, - ) -> JoinHandle<()> { - std::thread::Builder::new() - .name("GC thread".into()) - .spawn(move || { - // FIXME: relaunch it? Panic is not good. - rc.gc_loop(conf).expect("GC thread died"); - }) - .unwrap() - } - - /// - /// GC thread's main loop - /// - fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> { - while !tenant_mgr::shutdown_requested() { - // Garbage collect old files that are not needed for PITR anymore - if conf.gc_horizon > 0 { - self.gc_iteration(None, conf.gc_horizon, false).unwrap(); - } - - // TODO Write it in more adequate way using - // condvar.wait_timeout() or something - let mut sleep_time = conf.gc_period.as_secs(); - while sleep_time > 0 && !tenant_mgr::shutdown_requested() { - sleep_time -= 1; - std::thread::sleep(Duration::from_secs(1)); - } - info!("gc thread for tenant {} waking up", self.tenantid); - } - Ok(()) - } - /// Save timeline metadata to file fn save_metadata( conf: &'static PageServerConf, @@ -546,7 +533,7 @@ impl LayeredRepository { // so that they too can be garbage collected. That's // used in tests, so we want as deterministic results as possible. if checkpoint_before_gc { - timeline.checkpoint()?; + timeline.checkpoint_forced()?; info!("timeline {} checkpoint_before_gc done", timelineid); } @@ -689,10 +676,18 @@ pub struct LayeredTimeline { /// Must always be acquired before the layer map/individual layer lock /// to avoid deadlock. write_lock: Mutex<()>, + + is_checkpoint_scheduled: Mutex, + last_gc: Mutex>, } /// Public interface functions impl Timeline for LayeredTimeline { + + fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline { + self + } + fn get_ancestor_lsn(&self) -> Lsn { self.ancestor_lsn } @@ -867,13 +862,25 @@ impl Timeline for LayeredTimeline { /// Public entry point for checkpoint(). All the logic is in the private /// checkpoint_internal function, this public facade just wraps it for /// metrics collection. - fn checkpoint(&self) -> Result<()> { + fn checkpoint_forced(&self) -> Result<()> { STORAGE_TIME - .with_label_values(&["checkpoint_force"]) + .with_label_values(&["checkpoint_forced"]) //pass checkpoint_distance=0 to force checkpoint .observe_closure_duration(|| self.checkpoint_internal(0, true)) } + fn checkpoint_scheduled(&self) -> Result<()> { + + let result = STORAGE_TIME + .with_label_values(&["checkpoint_scheduled"]) + .observe_closure_duration(|| self.checkpoint_internal(self.conf.checkpoint_distance, false)); + + let mut guard = self.is_checkpoint_scheduled.lock().unwrap(); + *guard = false; + + result + } + fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last } @@ -977,6 +984,9 @@ impl LayeredTimeline { upload_relishes, write_lock: Mutex::new(()), + + is_checkpoint_scheduled: Mutex::new(false), + last_gc: Mutex::new(None), }; Ok(timeline) } @@ -1145,7 +1155,7 @@ impl LayeredTimeline { /// /// Get a handle to the latest layer for appending. /// - fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result> { + fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result> { let mut layers = self.layers.lock().unwrap(); assert!(lsn.is_aligned()); @@ -1160,7 +1170,9 @@ impl LayeredTimeline { // Do we have a layer open for writing already? let layer; - if let Some(open_layer) = layers.get_open(&seg) { + if let Some(open_layer_arc) = layers.get_open(&seg) { + let open_layer = open_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer"); + if open_layer.get_start_lsn() > lsn { bail!("unexpected open layer in the future"); } @@ -1185,7 +1197,7 @@ impl LayeredTimeline { lsn, )?; } else { - return Ok(open_layer); + return Ok(open_layer_arc); } } // No writeable layer for this relation. Create one. @@ -1276,9 +1288,10 @@ impl LayeredTimeline { // a lot of memory and/or aren't receiving much updates anymore. let mut disk_consistent_lsn = last_record_lsn; - let mut created_historics = false; let mut layer_uploads = Vec::new(); - while let Some((oldest_layer_id, oldest_layer, oldest_generation)) = layers.peek_oldest_open() { + while let Some((oldest_layer_id, oldest_layer_arc, oldest_generation)) = layers.peek_oldest_open() { + let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer"); + let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); if tenant_mgr::shutdown_requested() && !forced { @@ -1307,42 +1320,14 @@ impl LayeredTimeline { break; } - // Mark the layer as no longer accepting writes and record the end_lsn. - // This happens in-place, no new layers are created now. - // We call `get_last_record_lsn` again, which may be different from the - // original load, as we may have released the write lock since then. - oldest_layer.freeze(self.get_last_record_lsn()); - - // The layer is no longer open, update the layer map to reflect this. - // We will replace it with on-disk historics below. - layers.remove(oldest_layer_id); - let oldest_layer_id = layers.insert_historic(oldest_layer.clone()); - - // Write the now-frozen layer to disk. That could take a while, so release the lock while do it drop(layers); drop(write_guard); - let new_historics = oldest_layer.write_to_disk(self)?; + // Evict the layer + self.evict_layer(oldest_layer_id)?; write_guard = self.write_lock.lock().unwrap(); layers = self.layers.lock().unwrap(); - - if !new_historics.is_empty() { - created_historics = true; - } - - // Finally, replace the frozen in-memory layer with the new on-disk layers - layers.remove(oldest_layer_id); - - // Add the historics to the LayerMap - for delta_layer in new_historics.delta_layers { - layer_uploads.push(delta_layer.path()); - layers.insert_historic(Arc::new(delta_layer)); - } - for image_layer in new_historics.image_layers { - layer_uploads.push(image_layer.path()); - layers.insert_historic(Arc::new(image_layer)); - } } // Call unload() on all frozen layers, to release memory. @@ -1355,14 +1340,6 @@ impl LayeredTimeline { drop(layers); drop(write_guard); - if created_historics { - // We must fsync the timeline dir to ensure the directory entries for - // new layer files are durable - let timeline_dir = - File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?; - timeline_dir.sync_all()?; - } - // If we were able to advance 'disk_consistent_lsn', save it the metadata file. // After crash, we will restart WAL streaming and processing from that point. let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); @@ -1408,6 +1385,131 @@ impl LayeredTimeline { Ok(()) } + pub fn schedule_checkpoint_if_needed(&self) -> Result<()> { + + let mut guard = self.is_checkpoint_scheduled.lock().unwrap(); + if *guard == true { + return Ok(()); + } + + let RecordLsn { + last: last_record_lsn, + prev: _prev_record_lsn, + } = self.last_record_lsn.load(); + + let mut layers = self.layers.lock().unwrap(); + if let Some((_oldest_layer_id, oldest_layer_arc, _oldest_generation)) = layers.peek_oldest_open() { + let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer"); + let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); + let distance = last_record_lsn.widening_sub(oldest_pending_lsn); + if distance > self.conf.checkpoint_distance.into() { + schedule_job(GlobalJob::CheckpointTimeline(self.tenantid, self.timelineid)); + *guard = true; + } + } + Ok(()) + } + + pub fn schedule_gc_if_needed(&self) -> Result<()> { + + let RecordLsn { + last: last_record_lsn, + prev: _prev_record_lsn, + } = self.last_record_lsn.load(); + + let gc_needed = { + let last_gc = self.last_gc.lock().unwrap(); + + if let Some(last_gc) = *last_gc { + let distance = last_record_lsn.widening_sub(last_gc); + if distance > std::cmp::max(10*1024*1024, self.conf.gc_horizon / 2) as i128 { + true + } else { + false + } + } else { + true + } + }; + + if !gc_needed { + return Ok(()); + } + + let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?; + let repo = repo.upgrade_to_layered_repository(); + + let mut gc_scheduled = repo.is_gc_scheduled.lock().unwrap(); + if *gc_scheduled == true { + return Ok(()); + } + + schedule_job(GlobalJob::GarbageCollect(self.tenantid)); + *gc_scheduled = true; + + Ok(()) + } + + fn evict_layer(&self, layer_id: LayerId) -> Result<()> { + let mut write_guard = self.write_lock.lock().unwrap(); + let mut layers = self.layers.lock().unwrap(); + + if let Some(victim_layer_arc) = layers.get_with_id(layer_id) { + + if let Some(victim_layer) = victim_layer_arc.upgrade_to_inmemory_layer() { + + // Mark the layer as no longer accepting writes and record the end_lsn. + // This happens in-place, no new layers are created now. + // We call `get_last_record_lsn` again, which may be different from the + // original load, as we may have released the write lock since then. + victim_layer.freeze(self.get_last_record_lsn()); + + // The layer is no longer open, update the layer map to reflect this. + // We will replace it with on-disk historics below. + layers.remove(layer_id); + + let frozen_layer_id = layers.insert_historic(victim_layer_arc.clone()); + + // Write the now-frozen layer to disk. That could take a while, so release the lock while do it + drop(layers); + drop(write_guard); + + let new_historics = victim_layer.write_to_disk(self)?; + let created_historics = !new_historics.is_empty(); + + write_guard = self.write_lock.lock().unwrap(); + layers = self.layers.lock().unwrap(); + + // Finally, replace the frozen in-memory layer with the new on-disk layers + layers.remove(frozen_layer_id); + + // Add the historics to the LayerMap + for delta_layer in new_historics.delta_layers { + // FIXME layer_uploads.push(delta_layer.path()); + layers.insert_historic(Arc::new(delta_layer)); + } + for image_layer in new_historics.image_layers { + // FIXME layer_uploads.push(image_layer.path()); + layers.insert_historic(Arc::new(image_layer)); + } + + if created_historics { + // We must fsync the timeline dir to ensure the directory entries for + // new layer files are durable + // + // TODO: it's inefficient to do this after every eviction, if we're evicting + // a lot of layers. + let timeline_dir = + File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?; + timeline_dir.sync_all()?; + } + drop(layers); + drop(write_guard); + } + } + Ok(()) + } + /// /// Garbage collect layer files on a timeline that are no longer needed. /// @@ -1462,7 +1564,7 @@ impl LayeredTimeline { // 1. Is it newer than cutoff point? if l.get_end_lsn() > cutoff { - info!( + trace!( "keeping {} {}-{} because it's newer than cutoff {}", seg, l.get_start_lsn(), @@ -1481,7 +1583,7 @@ impl LayeredTimeline { for retain_lsn in &retain_lsns { // start_lsn is inclusive and end_lsn is exclusive if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() { - info!( + trace!( "keeping {} {}-{} because it's needed by branch point {}", seg, l.get_start_lsn(), @@ -1500,7 +1602,7 @@ impl LayeredTimeline { // 3. Is there a later on-disk layer for this relation? if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn()) { - info!( + trace!( "keeping {} {}-{} because it is the latest layer", seg, l.get_start_lsn(), @@ -1569,7 +1671,7 @@ impl LayeredTimeline { } if is_tombstone { - info!( + trace!( "keeping {} {}-{} because this layer servers as a tombstome for older layer", seg, l.get_start_lsn(), @@ -1600,20 +1702,25 @@ impl LayeredTimeline { // (couldn't do this in the loop above, because you cannot modify a collection // while iterating it. BTreeMap::retain() would be another option) for doomed_layer_id in layers_to_remove { - let doomed_layer = layers.get_with_id(doomed_layer_id); - doomed_layer.delete()?; - layers.remove(doomed_layer_id); - match ( - doomed_layer.is_dropped(), - doomed_layer.get_seg_tag().rel.is_relation(), - ) { - (true, true) => result.ondisk_relfiles_dropped += 1, - (true, false) => result.ondisk_nonrelfiles_dropped += 1, - (false, true) => result.ondisk_relfiles_removed += 1, - (false, false) => result.ondisk_nonrelfiles_removed += 1, + if let Some(doomed_layer) = layers.get_with_id(doomed_layer_id) { + + doomed_layer.delete()?; + layers.remove(doomed_layer_id); + match ( + doomed_layer.is_dropped(), + doomed_layer.get_seg_tag().rel.is_relation(), + ) { + (true, true) => result.ondisk_relfiles_dropped += 1, + (true, false) => result.ondisk_nonrelfiles_dropped += 1, + (false, true) => result.ondisk_relfiles_removed += 1, + (false, false) => result.ondisk_nonrelfiles_removed += 1, + } } } + let mut guard = self.last_gc.lock().unwrap(); + *guard = Some(cutoff); + result.elapsed = now.elapsed(); Ok(result) } @@ -1806,9 +1913,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { let seg = SegmentTag::from_blknum(rel, blknum); let layer = self.tl.get_layer_for_write(seg, lsn)?; - let delta_size = layer.put_wal_record(lsn, blknum, rec); + let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_wal_record(lsn, blknum, rec); self.tl .increase_current_logical_size(delta_size * BLCKSZ as u32); + Ok(()) } @@ -1825,7 +1933,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { let seg = SegmentTag::from_blknum(rel, blknum); let layer = self.tl.get_layer_for_write(seg, lsn)?; - let delta_size = layer.put_page_image(blknum, lsn, img); + let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_page_image(blknum, lsn, img); self.tl .increase_current_logical_size(delta_size * BLCKSZ as u32); @@ -1870,7 +1978,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { }; let layer = self.tl.get_layer_for_write(seg, lsn)?; - layer.drop_segment(lsn); + layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn); } // Truncate the last remaining segment to the specified size @@ -1880,7 +1988,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { segno: last_remain_seg, }; let layer = self.tl.get_layer_for_write(seg, lsn)?; - layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE) + layer.upgrade_to_inmemory_layer().unwrap().put_truncation(lsn, relsize % RELISH_SEG_SIZE) } self.tl .decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32); @@ -1908,7 +2016,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { segno: remove_segno, }; let layer = self.tl.get_layer_for_write(seg, lsn)?; - layer.drop_segment(lsn); + layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn); } self.tl .decrease_current_logical_size(oldsize * BLCKSZ as u32); @@ -1922,7 +2030,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { // TODO handle TwoPhase relishes let seg = SegmentTag::from_blknum(rel, 0); let layer = self.tl.get_layer_for_write(seg, lsn)?; - layer.drop_segment(lsn); + layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn); } Ok(()) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 6104155dc5..8fcba1becd 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -151,6 +151,10 @@ pub struct DeltaLayerInner { } impl Layer for DeltaLayer { + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 9d977a9155..26780b7404 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -120,6 +120,10 @@ impl Layer for ImageLayer { PathBuf::from(self.layer_name().to_string()) } + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 474eef09c4..aefd8cd2b7 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -90,6 +90,11 @@ impl InMemoryLayerInner { } impl Layer for InMemoryLayer { + + fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> { + Some(self) + } + // An in-memory layer doesn't really have a filename as it's not stored on disk, // but we construct a filename as if it was a delta layer fn filename(&self) -> PathBuf { @@ -113,6 +118,10 @@ impl Layer for InMemoryLayer { PathBuf::from(format!("inmem-{}", delta_filename)) } + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } diff --git a/pageserver/src/layered_repository/jobs.rs b/pageserver/src/layered_repository/jobs.rs new file mode 100644 index 0000000000..a5aeae1051 --- /dev/null +++ b/pageserver/src/layered_repository/jobs.rs @@ -0,0 +1,129 @@ +use crate::tenant_mgr; +use crate::layered_repository::layer_map; +use crate::PageServerConf; + +use anyhow::Result; +use lazy_static::lazy_static; +use tracing::*; + +use std::collections::VecDeque; +use std::sync::Mutex; +use std::thread::JoinHandle; + +use zenith_utils::zid::{ZTenantId, ZTimelineId}; + +lazy_static! { + static ref JOB_QUEUE: Mutex = Mutex::new(GlobalJobQueue::default()); +} + + +#[derive(Default)] +struct GlobalJobQueue { + jobs: VecDeque, +} + +pub enum GlobalJob { + // To release memory + EvictSomeLayer, + + // To advance 'disk_consistent_lsn' + CheckpointTimeline(ZTenantId, ZTimelineId), + + // To free up disk space + GarbageCollect(ZTenantId), +} + +pub fn schedule_job(job: GlobalJob) { + let mut queue = JOB_QUEUE.lock().unwrap(); + queue.jobs.push_back(job); +} + +/// +/// Launch the global job handler thread +/// +/// TODO: This ought to be a pool of threads +/// +pub fn launch_global_job_thread(conf: &'static PageServerConf) -> JoinHandle<()> { + std::thread::Builder::new() + .name("Global Job thread".into()) + .spawn(move || { + // FIXME: relaunch it? Panic is not good. + + global_job_loop(conf).expect("Global job thread died"); + }) + .unwrap() +} + +pub fn global_job_loop(conf: &'static PageServerConf) -> Result<()> { + while !tenant_mgr::shutdown_requested() { + std::thread::sleep(conf.checkpoint_period); + info!("global job thread waking up"); + + let mut queue = JOB_QUEUE.lock().unwrap(); + while let Some(job) = queue.jobs.pop_front() { + drop(queue); + + let result = match job { + GlobalJob::EvictSomeLayer => { + evict_layer() + }, + GlobalJob::CheckpointTimeline(tenantid, timelineid) => { + checkpoint_timeline(tenantid, timelineid) + } + GlobalJob::GarbageCollect(tenantid) => { + gc_tenant(tenantid) + } + }; + + if let Err(err) = result { + error!("job ended in error: {:#}", err); + } + + queue = JOB_QUEUE.lock().unwrap(); + } + } + trace!("Checkpointer thread shut down"); + Ok(()) +} + +// Freeze and write out an in-memory layer +fn evict_layer() -> Result<()> +{ + // Pick a victim + while let Some(layer_id) = layer_map::find_victim() { + let victim_layer = match layer_map::get_layer(layer_id) { + Some(l) => l, + None => continue, + }; + + let tenantid = victim_layer.get_tenant_id(); + let timelineid = victim_layer.get_timeline_id(); + + let _entered = info_span!("global evict", timeline = %timelineid, tenant = %tenantid) + .entered(); + + info!("evicting {}", victim_layer.filename().display()); + + drop(victim_layer); + + let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; + + timeline.upgrade_to_layered_timeline().evict_layer(layer_id)? + } + info!("no more eviction needed"); + Ok(()) +} + +fn checkpoint_timeline(tenantid: ZTenantId, timelineid: ZTimelineId) -> Result<()> { + let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; + + timeline.checkpoint_scheduled() +} + +fn gc_tenant(tenantid: ZTenantId) -> Result<()> { + let tenant = tenant_mgr::get_repository_for_tenant(tenantid)?; + + tenant.gc_scheduled()?; + + Ok(()) +} diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 7d3027de2a..e759cfc0c3 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -25,6 +25,7 @@ +use crate::layered_repository::{schedule_job, GlobalJob}; use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree}; use crate::layered_repository::storage_layer::{Layer, SegmentTag}; use crate::layered_repository::InMemoryLayer; @@ -34,6 +35,7 @@ use lazy_static::lazy_static; use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap}; use std::sync::{Arc, Mutex}; +use tracing::*; use zenith_metrics::{register_int_gauge, IntGauge}; use zenith_utils::lsn::Lsn; @@ -49,86 +51,172 @@ lazy_static! { static ref LAYERS: Mutex = Mutex::new(GlobalLayerMap::new()); } -const MAX_LOADED_LAYERS: usize = 10; +const MAX_OPEN_LAYERS: usize = 10; -#[derive(Clone)] -enum GlobalLayerEntry { - InMemory(Arc), - Historic(Arc), +const MAX_LOADED_LAYERS: usize = 100; + +struct GlobalLayerEntry { + tag: u64, // to fix ABA problem + layer: Option>, + usage_count: u32, } struct GlobalLayerMap { - layers: HashMap, - last_id: u64, + open_layers: Vec, + clock_arm: u32, - // Layers currently loaded. We run a clock algorithm across these. - loaded_layers: Vec, + num_open_layers: usize, + eviction_scheduled: bool, + + historic_layers: Vec, } impl GlobalLayerMap { pub fn new() -> GlobalLayerMap { GlobalLayerMap { - layers: HashMap::new(), - last_id: 0, - loaded_layers: Vec::new(), + open_layers: Vec::new(), + clock_arm: 0, + historic_layers: Vec::new(), + eviction_scheduled: false, + num_open_layers: 0, } } - pub fn get(&mut self, layer_id: LayerId) -> Arc { - - match self.layers.get(&layer_id) { - Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(), - Some(GlobalLayerEntry::Historic(layer)) => layer.clone(), - None => panic!() + pub fn get(&mut self, layer_id: LayerId) -> Option> { + let e = if layer_id.is_historic() { + let idx = (layer_id.index - 1) as usize; + &mut self.historic_layers[idx] + } else { + let idx = ((-layer_id.index) - 1) as usize; + &mut self.open_layers[idx] + }; + if e.usage_count < 5 { + e.usage_count += 1; } - } - pub fn get_open(&mut self, layer_id: LayerId) -> Arc { - match self.layers.get(&layer_id) { - Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(), - Some(GlobalLayerEntry::Historic(_layer)) => panic!(), - None => panic!() - } + e.layer.clone() } pub fn insert_open(&mut self, layer: Arc) -> LayerId { - let layer_id = LayerId(self.last_id); - self.last_id += 1; + let index = -(self.open_layers.len() as isize + 1); - self.layers.insert(layer_id, GlobalLayerEntry::InMemory(layer)); + let entry = GlobalLayerEntry { + layer: Some(layer), + usage_count: 1, + tag: 1, + }; + let tag = entry.tag; + self.open_layers.push(entry); + self.num_open_layers += 1; - layer_id + NUM_INMEMORY_LAYERS.inc(); + + if !self.eviction_scheduled && self.num_open_layers >= MAX_OPEN_LAYERS { + info!("scheduling global eviction"); + schedule_job(GlobalJob::EvictSomeLayer); + self.eviction_scheduled = true; + } + + LayerId { + index, + tag, + } } pub fn insert_historic(&mut self, layer: Arc) -> LayerId { - let layer_id = LayerId(self.last_id); - self.last_id += 1; + let index = self.historic_layers.len() as isize + 1; - self.layers.insert(layer_id, GlobalLayerEntry::Historic(layer)); + let entry = GlobalLayerEntry { + layer: Some(layer), + usage_count: 1, + tag: 1, + }; + let tag = entry.tag; + self.historic_layers.push(entry); - layer_id + NUM_ONDISK_LAYERS.inc(); + + LayerId { + index, + tag, + } } - pub fn remove(&mut self, layer_id: LayerId) -> GlobalLayerEntry { - if let Some(entry) = self.layers.remove(&layer_id) { - let orig_entry = entry.clone(); - match orig_entry { - GlobalLayerEntry::InMemory(_layer) => { - NUM_INMEMORY_LAYERS.dec(); - }, - GlobalLayerEntry::Historic(_layer) => { - NUM_ONDISK_LAYERS.dec(); - } + pub fn remove(&mut self, layer_id: LayerId) -> Option> { + let old_layer; + + if layer_id.is_historic() { + let idx = (layer_id.index - 1) as usize; + old_layer = self.historic_layers[idx].layer.take(); + if old_layer.is_some() { + NUM_ONDISK_LAYERS.dec(); } - entry.clone() } else { - panic!() + let idx = ((-layer_id.index) - 1) as usize; + old_layer = self.open_layers[idx].layer.take(); + if old_layer.is_some() { + NUM_INMEMORY_LAYERS.dec(); + self.num_open_layers -= 1; + } } + old_layer + } + + pub fn find_victim(&mut self) -> Option { + // run the clock algorithm among all open layers + for _ in 0..self.open_layers.len() * 5 { + self.clock_arm += 1; + if self.clock_arm >= self.open_layers.len() as u32 { + self.clock_arm = 0; + } + let next = self.clock_arm as usize; + + if self.open_layers[next].usage_count == 0 { + return Some(LayerId { + index: -((next + 1) as isize), + tag: self.open_layers[next].tag, + }); + } else { + self.open_layers[next].usage_count -= 1; + } + } + None } } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] -pub struct LayerId(u64); +pub fn find_victim() -> Option { + let mut l = LAYERS.lock().unwrap(); + + if l.num_open_layers >= MAX_OPEN_LAYERS { + if let Some(x) = l.find_victim() { + info!("found victim out of {} open layers", l.num_open_layers); + Some(x) + } else { + info!("no victim found at {} open layers", l.num_open_layers); + None + } + } else { + info!("no victim needed at {} open layers", l.num_open_layers); + l.eviction_scheduled = false; + None + } +} + +pub fn get_layer(layer_id: LayerId) -> Option> { + LAYERS.lock().unwrap().get(layer_id) +} + +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct LayerId { + index: isize, + tag: u64 +} + +impl LayerId { + pub fn is_historic(&self) -> bool { + self.index > 0 + } +} /// /// LayerMap tracks what layers exist on a timeline. @@ -161,7 +249,7 @@ impl LayerMap { segentry.get(lsn) } - pub fn get_with_id(&self, layer_id: LayerId) -> Arc { + pub fn get_with_id(&self, layer_id: LayerId) -> Option> { // TODO: check that it belongs to this tenant+timeline LAYERS.lock().unwrap().get(layer_id) } @@ -170,15 +258,11 @@ impl LayerMap { /// Get the open layer for given segment for writing. Or None if no open /// layer exists. /// - pub fn get_open(&self, tag: &SegmentTag) -> Option> { + pub fn get_open(&self, tag: &SegmentTag) -> Option> { let segentry = self.segs.get(tag)?; - - if let Some((layer_id, _start_lsn)) = segentry.open { - Some(LAYERS.lock().unwrap().get_open(layer_id)) - } else { - None - } + let (layer_id, _start_lsn) = segentry.open?; + LAYERS.lock().unwrap().get(layer_id) } /// @@ -206,24 +290,19 @@ impl LayerMap { generation: self.current_generation, }; self.open_layers.push(open_layer_entry); - - NUM_INMEMORY_LAYERS.inc(); } - /// Remove the oldest in-memory layer + /// Remove a layer pub fn remove(&mut self, layer_id: LayerId) { - let layer_entry = LAYERS.lock().unwrap().remove(layer_id); - - // Also remove it from the SegEntry of this segment - match layer_entry { - GlobalLayerEntry::InMemory(layer) => { + if let Some(layer) = LAYERS.lock().unwrap().remove(layer_id) { + // Also remove it from the SegEntry of this segment + if layer_id.is_historic() { let tag = layer.get_seg_tag(); if let Some(segentry) = self.segs.get_mut(&tag) { segentry.historic.remove(&HistoricLayerIntervalTreeEntry::new(layer_id, layer)); } - } - GlobalLayerEntry::Historic(layer) => { + } else { let segtag = layer.get_seg_tag(); let mut segentry = self.segs.get_mut(&segtag).unwrap(); if let Some(open) = segentry.open { @@ -311,15 +390,18 @@ impl LayerMap { } /// Return the oldest in-memory layer, along with its generation number. - pub fn peek_oldest_open(&self) -> Option<(LayerId, Arc, u64)> { + pub fn peek_oldest_open(&mut self) -> Option<(LayerId, Arc, u64)> { - if let Some(oldest_entry) = self.open_layers.peek() { - Some((oldest_entry.layer_id, - LAYERS.lock().unwrap().get_open(oldest_entry.layer_id), - oldest_entry.generation)) - } else { - None + while let Some(oldest_entry) = self.open_layers.peek() { + if let Some(layer) = LAYERS.lock().unwrap().get(oldest_entry.layer_id) { + return Some((oldest_entry.layer_id, + layer, + oldest_entry.generation)); + } else { + self.open_layers.pop(); + } } + None } /// Increment the generation number used to stamp open in-memory layers. Layers @@ -417,14 +499,15 @@ impl SegEntry { pub fn get(&self, lsn: Lsn) -> Option> { if let Some(open) = &self.open { - let layer = LAYERS.lock().unwrap().get(open.0); - if layer.get_start_lsn() <= lsn { - return Some(layer); + if let Some(layer) = LAYERS.lock().unwrap().get(open.0) { + if layer.get_start_lsn() <= lsn { + return Some(layer); + } } } if let Some(historic) = self.historic.search(lsn) { - Some(LAYERS.lock().unwrap().get(historic.layer_id)) + Some(LAYERS.lock().unwrap().get(historic.layer_id).unwrap()) } else { None } @@ -437,7 +520,7 @@ impl SegEntry { self.historic .iter_newer(lsn) .any(|e| { - let layer = LAYERS.lock().unwrap().get(e.layer_id); + let layer = LAYERS.lock().unwrap().get(e.layer_id).unwrap(); !layer.is_incremental() } ) @@ -504,7 +587,7 @@ impl<'a> Iterator for HistoricLayerIter<'a> { loop { if let Some(x) = &mut self.iter { if let Some(x) = x.next() { - let layer = LAYERS.lock().unwrap().get(x.layer_id); + let layer = LAYERS.lock().unwrap().get(x.layer_id).unwrap(); return Some((x.layer_id, layer)); } } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 0a86fe407d..2c34fb7a6c 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -2,9 +2,10 @@ //! Common traits and structs for layers //! +use crate::layered_repository::InMemoryLayer; use crate::relish::RelishTag; use crate::repository::WALRecord; -use crate::ZTimelineId; +use crate::{ZTenantId, ZTimelineId}; use anyhow::Result; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -104,6 +105,14 @@ pub enum PageReconstructResult { /// in-memory and on-disk layers. /// pub trait Layer: Send + Sync { + + fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> { + None + } + + /// Identify the timeline this relish belongs to + fn get_tenant_id(&self) -> ZTenantId; + /// Identify the timeline this relish belongs to fn get_timeline_id(&self) -> ZTimelineId; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d78974aebb..6471e56a9a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -690,7 +690,7 @@ impl postgres_backend::Handler for PageServerHandler { let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; + let result = repo.gc_manual(Some(timelineid), gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"layer_relfiles_total"), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index c1bdb87944..68335faac2 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -33,12 +33,16 @@ pub trait Repository: Send + Sync { /// `checkpoint_before_gc` parameter is used to force compaction of storage before CG /// to make tests more deterministic. /// TODO Do we still need it or we can call checkpoint explicitly in tests where needed? - fn gc_iteration( + fn gc_manual( &self, timelineid: Option, horizon: u64, checkpoint_before_gc: bool, ) -> Result; + + fn gc_scheduled(&self) -> Result; + + fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository; } /// @@ -144,7 +148,9 @@ pub trait Timeline: Send + Sync { /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. - fn checkpoint(&self) -> Result<()>; + fn checkpoint_forced(&self) -> Result<()>; + + fn checkpoint_scheduled(&self) -> Result<()>; /// Retrieve current logical size of the timeline /// @@ -155,6 +161,8 @@ pub trait Timeline: Send + Sync { /// Does the same as get_current_logical_size but counted on demand. /// Used in tests to ensure thet incremental and non incremental variants match. fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result; + + fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline; } /// Various functions to mutate the timeline. @@ -714,8 +722,8 @@ mod tests { .contains(&TESTREL_A)); // Run checkpoint and garbage collection and check that it's still not visible - newtline.checkpoint()?; - repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?; + newtline.checkpoint_forced()?; + repo.gc_manual(Some(NEW_TIMELINE_ID), 0, true)?; assert!(!newtline .list_rels(0, TESTDB, Lsn(0x40))? diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index ccd75d6fcd..2b01276158 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -106,17 +106,6 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) { true, )); - let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone()); - let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone()); - - let mut handles = TENANT_HANDLES.lock().unwrap(); - let h = TenantHandleEntry { - checkpointer_handle: Some(checkpointer_handle), - gc_handle: Some(gc_handle), - }; - - handles.insert(tenant_id, h); - let mut m = access_tenants(); let tenant = m.get_mut(&tenant_id).unwrap(); tenant.repo = Some(repo); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 65b3fa5cf6..45df5f2a9a 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -251,6 +251,9 @@ fn walreceiver_main( last_rec_lsn = lsn; } + timeline.upgrade_to_layered_timeline().schedule_checkpoint_if_needed()?; + timeline.upgrade_to_layered_timeline().schedule_gc_if_needed()?; + if !caught_up && endlsn >= end_of_wal { info!("caught up at LSN {}", endlsn); caught_up = true;