Compare commits

...

5 Commits

Author SHA1 Message Date
Heikki Linnakangas
ea56b4a36a WIP: global job queue 2021-10-29 14:00:41 +03:00
Heikki Linnakangas
7cf7215ce2 WIP: Use poll() 2021-10-28 16:23:51 +03:00
Heikki Linnakangas
23713eb44f WIP: cache vfds 2021-10-28 01:03:34 +03:00
Heikki Linnakangas
28675739de WIP 2021-10-27 19:00:40 +03:00
Heikki Linnakangas
ea90d102e2 Refactor 'zenith' CLI subcommand handling
Also fixes 'zenith safekeeper restart -m immediate'. The stop-mode was
previously ignored.
2021-10-27 12:01:21 +03:00
21 changed files with 1196 additions and 530 deletions

3
Cargo.lock generated
View File

@@ -208,8 +208,6 @@ dependencies = [
[[package]] [[package]]
name = "bookfile" name = "bookfile"
version = "0.3.0" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753"
dependencies = [ dependencies = [
"aversion", "aversion",
"byteorder", "byteorder",
@@ -1195,6 +1193,7 @@ dependencies = [
"hyper", "hyper",
"lazy_static", "lazy_static",
"log", "log",
"nix",
"postgres", "postgres",
"postgres-protocol", "postgres-protocol",
"postgres-types", "postgres-types",

View File

@@ -5,7 +5,7 @@ authors = ["Stas Kelvich <stas@zenith.tech>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
bookfile = "^0.3" bookfile = { path = "../../bookfile" }
chrono = "0.4.19" chrono = "0.4.19"
rand = "0.8.3" rand = "0.8.3"
regex = "1.4.5" regex = "1.4.5"
@@ -37,6 +37,7 @@ async-trait = "0.1"
const_format = "0.2.21" const_format = "0.2.21"
tracing = "0.1.27" tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
nix = "0.23"
postgres_ffi = { path = "../postgres_ffi" } postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" } zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -28,6 +28,7 @@ use daemonize::Daemonize;
use pageserver::{ use pageserver::{
branches, defaults::*, http, page_service, relish_storage, tenant_mgr, PageServerConf, branches, defaults::*, http, page_service, relish_storage, tenant_mgr, PageServerConf,
layered_repository,
RelishStorageConfig, RelishStorageKind, S3Config, LOG_FILE_NAME, RelishStorageConfig, RelishStorageKind, S3Config, LOG_FILE_NAME,
}; };
use zenith_utils::http::endpoint; use zenith_utils::http::endpoint;
@@ -549,6 +550,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?; })?;
let global_job_thread = layered_repository::launch_global_job_thread(conf);
for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() { for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() {
match info.signal { match info.signal {
SIGQUIT => { SIGQUIT => {
@@ -577,6 +580,12 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
.expect("thread panicked") .expect("thread panicked")
.expect("thread exited with an error"); .expect("thread exited with an error");
} }
// Shut down global job thread
global_job_thread
.join()
.expect("thread panicked");
info!("Pageserver shut down successfully completed"); info!("Pageserver shut down successfully completed");
exit(0); exit(0);
} }

View File

@@ -230,7 +230,7 @@ fn bootstrap_timeline(
timeline.writer().as_ref(), timeline.writer().as_ref(),
lsn, lsn,
)?; )?;
timeline.checkpoint()?; timeline.checkpoint_forced()?;
println!( println!(
"created initial timeline {} timeline.lsn {}", "created initial timeline {} timeline.lsn {}",

View File

@@ -59,6 +59,7 @@ mod image_layer;
mod inmemory_layer; mod inmemory_layer;
mod interval_tree; mod interval_tree;
mod layer_map; mod layer_map;
mod jobs;
mod page_versions; mod page_versions;
mod storage_layer; mod storage_layer;
@@ -66,10 +67,12 @@ use delta_layer::DeltaLayer;
use image_layer::ImageLayer; use image_layer::ImageLayer;
use inmemory_layer::InMemoryLayer; use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap; use layer_map::{LayerId, LayerMap};
use storage_layer::{ use storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE, Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
}; };
use jobs::{GlobalJob, schedule_job};
pub use jobs::launch_global_job_thread;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
@@ -129,10 +132,17 @@ pub struct LayeredRepository {
/// Makes evey repo's timelines to backup their files to remote storage, /// Makes evey repo's timelines to backup their files to remote storage,
/// when they get frozen. /// when they get frozen.
upload_relishes: bool, upload_relishes: bool,
is_gc_scheduled: Mutex<bool>,
} }
/// Public interface /// Public interface
impl Repository for LayeredRepository { impl Repository for LayeredRepository {
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository {
self
}
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> { fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap(); let mut timelines = self.timelines.lock().unwrap();
@@ -206,19 +216,33 @@ impl Repository for LayeredRepository {
/// Public entry point to GC. All the logic is in the private /// Public entry point to GC. All the logic is in the private
/// gc_iteration_internal function, this public facade just wraps it for /// gc_iteration_internal function, this public facade just wraps it for
/// metrics collection. /// metrics collection.
fn gc_iteration( fn gc_manual(
&self, &self,
target_timelineid: Option<ZTimelineId>, target_timelineid: Option<ZTimelineId>,
horizon: u64, horizon: u64,
checkpoint_before_gc: bool, checkpoint_before_gc: bool,
) -> Result<GcResult> { ) -> Result<GcResult> {
STORAGE_TIME STORAGE_TIME
.with_label_values(&["gc"]) .with_label_values(&["gc_manual"])
.observe_closure_duration(|| { .observe_closure_duration(|| {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc) self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
}) })
} }
fn gc_scheduled(&self) -> Result<GcResult> {
let result = STORAGE_TIME
.with_label_values(&["gc_scheduled"])
.observe_closure_duration(|| {
self.gc_iteration_internal(None, self.conf.gc_horizon, false)
});
let mut guard = self.is_gc_scheduled.lock().unwrap();
*guard = false;
result
}
// Wait for all threads to complete and persist repository data before pageserver shutdown. // Wait for all threads to complete and persist repository data before pageserver shutdown.
fn shutdown(&self) -> Result<()> { fn shutdown(&self) -> Result<()> {
trace!("LayeredRepository shutdown for tenant {}", self.tenantid); trace!("LayeredRepository shutdown for tenant {}", self.tenantid);
@@ -228,7 +252,7 @@ impl Repository for LayeredRepository {
walreceiver::stop_wal_receiver(*timelineid); walreceiver::stop_wal_receiver(*timelineid);
// Wait for syncing data to disk // Wait for syncing data to disk
trace!("repo shutdown. checkpoint timeline {}", timelineid); trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint()?; timeline.checkpoint_forced()?;
//TODO Wait for walredo process to shutdown too //TODO Wait for walredo process to shutdown too
} }
@@ -306,6 +330,7 @@ impl LayeredRepository {
timelines: Mutex::new(HashMap::new()), timelines: Mutex::new(HashMap::new()),
walredo_mgr, walredo_mgr,
upload_relishes, upload_relishes,
is_gc_scheduled: Mutex::new(false),
} }
} }
@@ -355,44 +380,6 @@ impl LayeredRepository {
Ok(()) Ok(())
} }
///
/// Launch the GC thread in given repository.
///
pub fn launch_gc_thread(
conf: &'static PageServerConf,
rc: Arc<LayeredRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.gc_loop(conf).expect("GC thread died");
})
.unwrap()
}
///
/// GC thread's main loop
///
fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
self.gc_iteration(None, conf.gc_horizon, false).unwrap();
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
while sleep_time > 0 && !tenant_mgr::shutdown_requested() {
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
info!("gc thread for tenant {} waking up", self.tenantid);
}
Ok(())
}
/// Save timeline metadata to file /// Save timeline metadata to file
fn save_metadata( fn save_metadata(
conf: &'static PageServerConf, conf: &'static PageServerConf,
@@ -546,7 +533,7 @@ impl LayeredRepository {
// so that they too can be garbage collected. That's // so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible. // used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc { if checkpoint_before_gc {
timeline.checkpoint()?; timeline.checkpoint_forced()?;
info!("timeline {} checkpoint_before_gc done", timelineid); info!("timeline {} checkpoint_before_gc done", timelineid);
} }
@@ -689,10 +676,18 @@ pub struct LayeredTimeline {
/// Must always be acquired before the layer map/individual layer lock /// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock. /// to avoid deadlock.
write_lock: Mutex<()>, write_lock: Mutex<()>,
is_checkpoint_scheduled: Mutex<bool>,
last_gc: Mutex<Option<Lsn>>,
} }
/// Public interface functions /// Public interface functions
impl Timeline for LayeredTimeline { impl Timeline for LayeredTimeline {
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline {
self
}
fn get_ancestor_lsn(&self) -> Lsn { fn get_ancestor_lsn(&self) -> Lsn {
self.ancestor_lsn self.ancestor_lsn
} }
@@ -867,13 +862,25 @@ impl Timeline for LayeredTimeline {
/// Public entry point for checkpoint(). All the logic is in the private /// Public entry point for checkpoint(). All the logic is in the private
/// checkpoint_internal function, this public facade just wraps it for /// checkpoint_internal function, this public facade just wraps it for
/// metrics collection. /// metrics collection.
fn checkpoint(&self) -> Result<()> { fn checkpoint_forced(&self) -> Result<()> {
STORAGE_TIME STORAGE_TIME
.with_label_values(&["checkpoint_force"]) .with_label_values(&["checkpoint_forced"])
//pass checkpoint_distance=0 to force checkpoint //pass checkpoint_distance=0 to force checkpoint
.observe_closure_duration(|| self.checkpoint_internal(0, true)) .observe_closure_duration(|| self.checkpoint_internal(0, true))
} }
fn checkpoint_scheduled(&self) -> Result<()> {
let result = STORAGE_TIME
.with_label_values(&["checkpoint_scheduled"])
.observe_closure_duration(|| self.checkpoint_internal(self.conf.checkpoint_distance, false));
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
*guard = false;
result
}
fn get_last_record_lsn(&self) -> Lsn { fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last self.last_record_lsn.load().last
} }
@@ -977,6 +984,9 @@ impl LayeredTimeline {
upload_relishes, upload_relishes,
write_lock: Mutex::new(()), write_lock: Mutex::new(()),
is_checkpoint_scheduled: Mutex::new(false),
last_gc: Mutex::new(None),
}; };
Ok(timeline) Ok(timeline)
} }
@@ -1145,7 +1155,7 @@ impl LayeredTimeline {
/// ///
/// Get a handle to the latest layer for appending. /// Get a handle to the latest layer for appending.
/// ///
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> { fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<dyn Layer>> {
let mut layers = self.layers.lock().unwrap(); let mut layers = self.layers.lock().unwrap();
assert!(lsn.is_aligned()); assert!(lsn.is_aligned());
@@ -1160,7 +1170,9 @@ impl LayeredTimeline {
// Do we have a layer open for writing already? // Do we have a layer open for writing already?
let layer; let layer;
if let Some(open_layer) = layers.get_open(&seg) { if let Some(open_layer_arc) = layers.get_open(&seg) {
let open_layer = open_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
if open_layer.get_start_lsn() > lsn { if open_layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future"); bail!("unexpected open layer in the future");
} }
@@ -1185,7 +1197,7 @@ impl LayeredTimeline {
lsn, lsn,
)?; )?;
} else { } else {
return Ok(open_layer); return Ok(open_layer_arc);
} }
} }
// No writeable layer for this relation. Create one. // No writeable layer for this relation. Create one.
@@ -1276,9 +1288,10 @@ impl LayeredTimeline {
// a lot of memory and/or aren't receiving much updates anymore. // a lot of memory and/or aren't receiving much updates anymore.
let mut disk_consistent_lsn = last_record_lsn; let mut disk_consistent_lsn = last_record_lsn;
let mut created_historics = false;
let mut layer_uploads = Vec::new(); let mut layer_uploads = Vec::new();
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() { while let Some((oldest_layer_id, oldest_layer_arc, oldest_generation)) = layers.peek_oldest_open() {
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
if tenant_mgr::shutdown_requested() && !forced { if tenant_mgr::shutdown_requested() && !forced {
@@ -1307,62 +1320,26 @@ impl LayeredTimeline {
break; break;
} }
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
oldest_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.pop_oldest_open();
layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers); drop(layers);
drop(write_guard); drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self)?; // Evict the layer
self.evict_layer(oldest_layer_id)?;
write_guard = self.write_lock.lock().unwrap(); write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap(); layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
created_historics = true;
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(oldest_layer);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
} }
// Call unload() on all frozen layers, to release memory. // Call unload() on all frozen layers, to release memory.
// This shouldn't be much memory, as only metadata is slurped // This shouldn't be much memory, as only metadata is slurped
// into memory. // into memory.
for layer in layers.iter_historic_layers() { for (_layer_id, layer) in layers.iter_historic_layers() {
layer.unload()?; layer.unload()?;
} }
drop(layers); drop(layers);
drop(write_guard); drop(write_guard);
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
}
// If we were able to advance 'disk_consistent_lsn', save it the metadata file. // If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point. // After crash, we will restart WAL streaming and processing from that point.
let old_disk_consistent_lsn = self.disk_consistent_lsn.load(); let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
@@ -1408,6 +1385,131 @@ impl LayeredTimeline {
Ok(()) Ok(())
} }
pub fn schedule_checkpoint_if_needed(&self) -> Result<()> {
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
if *guard == true {
return Ok(());
}
let RecordLsn {
last: last_record_lsn,
prev: _prev_record_lsn,
} = self.last_record_lsn.load();
let mut layers = self.layers.lock().unwrap();
if let Some((_oldest_layer_id, oldest_layer_arc, _oldest_generation)) = layers.peek_oldest_open() {
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
if distance > self.conf.checkpoint_distance.into() {
schedule_job(GlobalJob::CheckpointTimeline(self.tenantid, self.timelineid));
*guard = true;
}
}
Ok(())
}
pub fn schedule_gc_if_needed(&self) -> Result<()> {
let RecordLsn {
last: last_record_lsn,
prev: _prev_record_lsn,
} = self.last_record_lsn.load();
let gc_needed = {
let last_gc = self.last_gc.lock().unwrap();
if let Some(last_gc) = *last_gc {
let distance = last_record_lsn.widening_sub(last_gc);
if distance > std::cmp::max(10*1024*1024, self.conf.gc_horizon / 2) as i128 {
true
} else {
false
}
} else {
true
}
};
if !gc_needed {
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
let repo = repo.upgrade_to_layered_repository();
let mut gc_scheduled = repo.is_gc_scheduled.lock().unwrap();
if *gc_scheduled == true {
return Ok(());
}
schedule_job(GlobalJob::GarbageCollect(self.tenantid));
*gc_scheduled = true;
Ok(())
}
fn evict_layer(&self, layer_id: LayerId) -> Result<()> {
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
if let Some(victim_layer_arc) = layers.get_with_id(layer_id) {
if let Some(victim_layer) = victim_layer_arc.upgrade_to_inmemory_layer() {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
victim_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.remove(layer_id);
let frozen_layer_id = layers.insert_historic(victim_layer_arc.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = victim_layer.write_to_disk(self)?;
let created_historics = !new_historics.is_empty();
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove(frozen_layer_id);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
// FIXME layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
// FIXME layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// TODO: it's inefficient to do this after every eviction, if we're evicting
// a lot of layers.
let timeline_dir =
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
timeline_dir.sync_all()?;
}
drop(layers);
drop(write_guard);
}
}
Ok(())
}
/// ///
/// Garbage collect layer files on a timeline that are no longer needed. /// Garbage collect layer files on a timeline that are no longer needed.
/// ///
@@ -1440,7 +1542,7 @@ impl LayeredTimeline {
debug!("retain_lsns: {:?}", retain_lsns); debug!("retain_lsns: {:?}", retain_lsns);
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new(); let mut layers_to_remove: Vec<LayerId> = Vec::new();
// Scan all on-disk layers in the timeline. // Scan all on-disk layers in the timeline.
// //
@@ -1451,7 +1553,7 @@ impl LayeredTimeline {
// 4. this layer doesn't serve as a tombstone for some older layer; // 4. this layer doesn't serve as a tombstone for some older layer;
// //
let mut layers = self.layers.lock().unwrap(); let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() { 'outer: for (layer_id, l) in layers.iter_historic_layers() {
let seg = l.get_seg_tag(); let seg = l.get_seg_tag();
if seg.rel.is_relation() { if seg.rel.is_relation() {
@@ -1462,7 +1564,7 @@ impl LayeredTimeline {
// 1. Is it newer than cutoff point? // 1. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff { if l.get_end_lsn() > cutoff {
info!( trace!(
"keeping {} {}-{} because it's newer than cutoff {}", "keeping {} {}-{} because it's newer than cutoff {}",
seg, seg,
l.get_start_lsn(), l.get_start_lsn(),
@@ -1481,7 +1583,7 @@ impl LayeredTimeline {
for retain_lsn in &retain_lsns { for retain_lsn in &retain_lsns {
// start_lsn is inclusive and end_lsn is exclusive // start_lsn is inclusive and end_lsn is exclusive
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() { if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
info!( trace!(
"keeping {} {}-{} because it's needed by branch point {}", "keeping {} {}-{} because it's needed by branch point {}",
seg, seg,
l.get_start_lsn(), l.get_start_lsn(),
@@ -1500,7 +1602,7 @@ impl LayeredTimeline {
// 3. Is there a later on-disk layer for this relation? // 3. Is there a later on-disk layer for this relation?
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn()) if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
{ {
info!( trace!(
"keeping {} {}-{} because it is the latest layer", "keeping {} {}-{} because it is the latest layer",
seg, seg,
l.get_start_lsn(), l.get_start_lsn(),
@@ -1569,7 +1671,7 @@ impl LayeredTimeline {
} }
if is_tombstone { if is_tombstone {
info!( trace!(
"keeping {} {}-{} because this layer servers as a tombstome for older layer", "keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg, seg,
l.get_start_lsn(), l.get_start_lsn(),
@@ -1593,27 +1695,32 @@ impl LayeredTimeline {
l.get_end_lsn(), l.get_end_lsn(),
l.is_dropped() l.is_dropped()
); );
layers_to_remove.push(Arc::clone(&l)); layers_to_remove.push(layer_id);
} }
// Actually delete the layers from disk and remove them from the map. // Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection // (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option) // while iterating it. BTreeMap::retain() would be another option)
for doomed_layer in layers_to_remove { for doomed_layer_id in layers_to_remove {
doomed_layer.delete()?; if let Some(doomed_layer) = layers.get_with_id(doomed_layer_id) {
layers.remove_historic(doomed_layer.clone());
match ( doomed_layer.delete()?;
doomed_layer.is_dropped(), layers.remove(doomed_layer_id);
doomed_layer.get_seg_tag().rel.is_relation(), match (
) { doomed_layer.is_dropped(),
(true, true) => result.ondisk_relfiles_dropped += 1, doomed_layer.get_seg_tag().rel.is_relation(),
(true, false) => result.ondisk_nonrelfiles_dropped += 1, ) {
(false, true) => result.ondisk_relfiles_removed += 1, (true, true) => result.ondisk_relfiles_dropped += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1, (true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
}
} }
} }
let mut guard = self.last_gc.lock().unwrap();
*guard = Some(cutoff);
result.elapsed = now.elapsed(); result.elapsed = now.elapsed();
Ok(result) Ok(result)
} }
@@ -1806,9 +1913,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum); let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?; let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_wal_record(lsn, blknum, rec); let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_wal_record(lsn, blknum, rec);
self.tl self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32); .increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(()) Ok(())
} }
@@ -1825,7 +1933,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum); let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.tl.get_layer_for_write(seg, lsn)?; let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_page_image(blknum, lsn, img); let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_page_image(blknum, lsn, img);
self.tl self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32); .increase_current_logical_size(delta_size * BLCKSZ as u32);
@@ -1870,7 +1978,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
}; };
let layer = self.tl.get_layer_for_write(seg, lsn)?; let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn); layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
} }
// Truncate the last remaining segment to the specified size // Truncate the last remaining segment to the specified size
@@ -1880,7 +1988,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
segno: last_remain_seg, segno: last_remain_seg,
}; };
let layer = self.tl.get_layer_for_write(seg, lsn)?; let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE) layer.upgrade_to_inmemory_layer().unwrap().put_truncation(lsn, relsize % RELISH_SEG_SIZE)
} }
self.tl self.tl
.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32); .decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
@@ -1908,7 +2016,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
segno: remove_segno, segno: remove_segno,
}; };
let layer = self.tl.get_layer_for_write(seg, lsn)?; let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn); layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
} }
self.tl self.tl
.decrease_current_logical_size(oldsize * BLCKSZ as u32); .decrease_current_logical_size(oldsize * BLCKSZ as u32);
@@ -1922,7 +2030,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
// TODO handle TwoPhase relishes // TODO handle TwoPhase relishes
let seg = SegmentTag::from_blknum(rel, 0); let seg = SegmentTag::from_blknum(rel, 0);
let layer = self.tl.get_layer_for_write(seg, lsn)?; let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn); layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
} }
Ok(()) Ok(())

View File

@@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct BlobRange { pub struct BlobRange {
offset: u64, pub offset: u64,
size: usize, pub size: usize,
} }
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> { pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {

View File

@@ -42,6 +42,7 @@ use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{ use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
}; };
use crate::vfd::VirtualFile;
use crate::waldecoder; use crate::waldecoder;
use crate::PageServerConf; use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId}; use crate::{ZTenantId, ZTimelineId};
@@ -145,9 +146,15 @@ pub struct DeltaLayerInner {
/// `relsizes` tracks the size of the relation at different points in time. /// `relsizes` tracks the size of the relation at different points in time.
relsizes: VecMap<Lsn, u32>, relsizes: VecMap<Lsn, u32>,
vfile: VirtualFile,
} }
impl Layer for DeltaLayer { impl Layer for DeltaLayer {
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId { fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid self.timelineid
} }
@@ -186,9 +193,11 @@ impl Layer for DeltaLayer {
{ {
// Open the file and lock the metadata in memory // Open the file and lock the metadata in memory
// TODO: avoid opening the file for each read // TODO: avoid opening the file for each read
let (_path, book) = self.open_book()?; let mut inner = self.load()?;
let file = inner.vfile.open()?;
let book = Book::new(file)?;
let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?; let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
let inner = self.load()?;
// Scan the metadata BTreeMap backwards, starting from the given entry. // Scan the metadata BTreeMap backwards, starting from the given entry.
let minkey = (blknum, Lsn(0)); let minkey = (blknum, Lsn(0));
@@ -221,6 +230,9 @@ impl Layer for DeltaLayer {
} }
// release metadata lock and close the file // release metadata lock and close the file
let file = book.close();
inner.vfile.cache(file);
} }
// If an older page image is needed to reconstruct the page, let the // If an older page image is needed to reconstruct the page, let the
@@ -365,6 +377,18 @@ impl DeltaLayer {
assert!(!relsizes.is_empty()); assert!(!relsizes.is_empty());
} }
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&DeltaFileName {
seg: seg,
start_lsn: start_lsn,
end_lsn: end_lsn,
dropped: dropped,
}
);
let delta_layer = DeltaLayer { let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf), path_or_conf: PathOrConf::Conf(conf),
timelineid, timelineid,
@@ -377,6 +401,7 @@ impl DeltaLayer {
loaded: true, loaded: true,
page_version_metas: VecMap::default(), page_version_metas: VecMap::default(),
relsizes, relsizes,
vfile: VirtualFile::new(&path),
}), }),
}; };
let mut inner = delta_layer.inner.lock().unwrap(); let mut inner = delta_layer.inner.lock().unwrap();
@@ -496,11 +521,9 @@ impl DeltaLayer {
debug!("loaded from {}", &path.display()); debug!("loaded from {}", &path.display());
*inner = DeltaLayerInner { inner.loaded = true;
loaded: true, inner.page_version_metas = page_version_metas;
page_version_metas, inner.relsizes = relsizes;
relsizes,
};
Ok(inner) Ok(inner)
} }
@@ -512,6 +535,13 @@ impl DeltaLayer {
tenantid: ZTenantId, tenantid: ZTenantId,
filename: &DeltaFileName, filename: &DeltaFileName,
) -> DeltaLayer { ) -> DeltaLayer {
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&filename,
);
DeltaLayer { DeltaLayer {
path_or_conf: PathOrConf::Conf(conf), path_or_conf: PathOrConf::Conf(conf),
timelineid, timelineid,
@@ -524,6 +554,7 @@ impl DeltaLayer {
loaded: false, loaded: false,
page_version_metas: VecMap::default(), page_version_metas: VecMap::default(),
relsizes: VecMap::default(), relsizes: VecMap::default(),
vfile: VirtualFile::new(&path),
}), }),
} }
} }
@@ -534,7 +565,7 @@ impl DeltaLayer {
pub fn new_for_path(path: &Path, book: &Book<File>) -> Result<Self> { pub fn new_for_path(path: &Path, book: &Book<File>) -> Result<Self> {
let chapter = book.read_chapter(SUMMARY_CHAPTER)?; let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
let summary = Summary::des(&chapter)?; let summary = Summary::des(&chapter)?;
Ok(DeltaLayer { Ok(DeltaLayer {
path_or_conf: PathOrConf::Path(path.to_path_buf()), path_or_conf: PathOrConf::Path(path.to_path_buf()),
timelineid: summary.timelineid, timelineid: summary.timelineid,
@@ -547,6 +578,7 @@ impl DeltaLayer {
loaded: false, loaded: false,
page_version_metas: VecMap::default(), page_version_metas: VecMap::default(),
relsizes: VecMap::default(), relsizes: VecMap::default(),
vfile: VirtualFile::new(path),
}), }),
}) })
} }

View File

@@ -29,6 +29,7 @@ use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::RELISH_SEG_SIZE; use crate::layered_repository::RELISH_SEG_SIZE;
use crate::PageServerConf; use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId}; use crate::{ZTenantId, ZTimelineId};
use crate::vfd::VirtualFile;
use anyhow::{anyhow, bail, ensure, Result}; use anyhow::{anyhow, bail, ensure, Result};
use bytes::Bytes; use bytes::Bytes;
use log::*; use log::*;
@@ -36,7 +37,7 @@ use serde::{Deserialize, Serialize};
use std::convert::TryInto; use std::convert::TryInto;
use std::fs; use std::fs;
use std::fs::File; use std::fs::File;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Mutex, MutexGuard}; use std::sync::{Mutex, MutexGuard};
@@ -110,6 +111,8 @@ pub struct ImageLayerInner {
/// Derived from filename and bookfile chapter metadata /// Derived from filename and bookfile chapter metadata
image_type: ImageType, image_type: ImageType,
vfile: VirtualFile,
} }
impl Layer for ImageLayer { impl Layer for ImageLayer {
@@ -117,6 +120,10 @@ impl Layer for ImageLayer {
PathBuf::from(self.layer_name().to_string()) PathBuf::from(self.layer_name().to_string())
} }
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId { fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid self.timelineid
} }
@@ -147,11 +154,12 @@ impl Layer for ImageLayer {
) -> Result<PageReconstructResult> { ) -> Result<PageReconstructResult> {
assert!(lsn >= self.lsn); assert!(lsn >= self.lsn);
let inner = self.load()?; let mut inner = self.load()?;
let base_blknum = blknum % RELISH_SEG_SIZE; let base_blknum = blknum % RELISH_SEG_SIZE;
let (_path, book) = self.open_book()?; let mut file = inner.vfile.open()?;
let mut book = Book::new(&mut file)?;
let buf = match &inner.image_type { let buf = match &inner.image_type {
ImageType::Blocky { num_blocks } => { ImageType::Blocky { num_blocks } => {
@@ -162,17 +170,20 @@ impl Layer for ImageLayer {
let mut buf = vec![0u8; BLOCK_SIZE]; let mut buf = vec![0u8; BLOCK_SIZE];
let offset = BLOCK_SIZE as u64 * base_blknum as u64; let offset = BLOCK_SIZE as u64 * base_blknum as u64;
let chapter = book.chapter_reader(BLOCKY_IMAGES_CHAPTER)?; let mut chapter = book.exclusive_chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
chapter.read_exact_at(&mut buf, offset)?; chapter.seek(SeekFrom::Start(offset))?;
chapter.read_exact(&mut buf)?;
buf buf
} }
ImageType::NonBlocky => { ImageType::NonBlocky => {
ensure!(base_blknum == 0); ensure!(base_blknum == 0);
book.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec() book.exclusive_read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec()
} }
}; };
inner.vfile.cache(file);
reconstruct_data.page_img = Some(Bytes::from(buf)); reconstruct_data.page_img = Some(Bytes::from(buf));
Ok(PageReconstructResult::Complete) Ok(PageReconstructResult::Complete)
} }
@@ -266,6 +277,16 @@ impl ImageLayer {
ImageType::NonBlocky ImageType::NonBlocky
}; };
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
&ImageFileName {
seg: seg,
lsn: lsn,
}
);
let layer = ImageLayer { let layer = ImageLayer {
path_or_conf: PathOrConf::Conf(conf), path_or_conf: PathOrConf::Conf(conf),
timelineid, timelineid,
@@ -275,12 +296,12 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner { inner: Mutex::new(ImageLayerInner {
loaded: true, loaded: true,
image_type: image_type.clone(), image_type: image_type.clone(),
vfile: VirtualFile::new(&path),
}), }),
}; };
let inner = layer.inner.lock().unwrap(); let inner = layer.inner.lock().unwrap();
// Write the images into a file // Write the images into a file
let path = layer.path();
// Note: This overwrites any existing file. There shouldn't be any. // Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead? // FIXME: throw an error instead?
let file = File::create(&path)?; let file = File::create(&path)?;
@@ -374,7 +395,8 @@ impl ImageLayer {
return Ok(inner); return Ok(inner);
} }
let (path, book) = self.open_book()?;
let book = Book::new(inner.vfile.open()?)?;
match &self.path_or_conf { match &self.path_or_conf {
PathOrConf::Conf(_) => { PathOrConf::Conf(_) => {
@@ -412,12 +434,10 @@ impl ImageLayer {
ImageType::NonBlocky ImageType::NonBlocky
}; };
debug!("loaded from {}", &path.display()); debug!("loaded from {}", &self.path().display());
*inner = ImageLayerInner { inner.loaded = true;
loaded: true, inner.image_type = image_type;
image_type,
};
Ok(inner) Ok(inner)
} }
@@ -438,6 +458,14 @@ impl ImageLayer {
tenantid: ZTenantId, tenantid: ZTenantId,
filename: &ImageFileName, filename: &ImageFileName,
) -> ImageLayer { ) -> ImageLayer {
let path = Self::path_for(
&PathOrConf::Conf(conf),
timelineid,
tenantid,
filename,
);
ImageLayer { ImageLayer {
path_or_conf: PathOrConf::Conf(conf), path_or_conf: PathOrConf::Conf(conf),
timelineid, timelineid,
@@ -447,6 +475,7 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner { inner: Mutex::new(ImageLayerInner {
loaded: false, loaded: false,
image_type: ImageType::Blocky { num_blocks: 0 }, image_type: ImageType::Blocky { num_blocks: 0 },
vfile: VirtualFile::new(&path),
}), }),
} }
} }
@@ -467,6 +496,7 @@ impl ImageLayer {
inner: Mutex::new(ImageLayerInner { inner: Mutex::new(ImageLayerInner {
loaded: false, loaded: false,
image_type: ImageType::Blocky { num_blocks: 0 }, image_type: ImageType::Blocky { num_blocks: 0 },
vfile: VirtualFile::new(path),
}), }),
}) })
} }

View File

@@ -90,6 +90,11 @@ impl InMemoryLayerInner {
} }
impl Layer for InMemoryLayer { impl Layer for InMemoryLayer {
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
Some(self)
}
// An in-memory layer doesn't really have a filename as it's not stored on disk, // An in-memory layer doesn't really have a filename as it's not stored on disk,
// but we construct a filename as if it was a delta layer // but we construct a filename as if it was a delta layer
fn filename(&self) -> PathBuf { fn filename(&self) -> PathBuf {
@@ -113,6 +118,10 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename)) PathBuf::from(format!("inmem-{}", delta_filename))
} }
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId { fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid self.timelineid
} }

View File

@@ -41,23 +41,22 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc;
pub struct IntervalTree<I: ?Sized> pub struct IntervalTree<I>
where where
I: IntervalItem, I: IntervalItem,
{ {
points: BTreeMap<I::Key, Point<I>>, points: BTreeMap<I::Key, Point<I>>,
} }
struct Point<I: ?Sized> { struct Point<I> {
/// All intervals that contain this point, in no particular order. /// All intervals that contain this point, in no particular order.
/// ///
/// We assume that there aren't a lot of overlappingg intervals, so that this vector /// We assume that there aren't a lot of overlappingg intervals, so that this vector
/// never grows very large. If that assumption doesn't hold, we could keep this ordered /// never grows very large. If that assumption doesn't hold, we could keep this ordered
/// by the end bound, to speed up `search`. But as long as there are only a few elements, /// by the end bound, to speed up `search`. But as long as there are only a few elements,
/// a linear search is OK. /// a linear search is OK.
elements: Vec<Arc<I>>, elements: Vec<I>,
} }
/// Abstraction for an interval that can be stored in the tree /// Abstraction for an interval that can be stored in the tree
@@ -75,14 +74,14 @@ pub trait IntervalItem {
} }
} }
impl<I: ?Sized> IntervalTree<I> impl<I> IntervalTree<I>
where where
I: IntervalItem, I: IntervalItem + PartialEq + Clone,
{ {
/// Return an element that contains 'key', or precedes it. /// Return an element that contains 'key', or precedes it.
/// ///
/// If there are multiple candidates, returns the one with the highest 'end' key. /// If there are multiple candidates, returns the one with the highest 'end' key.
pub fn search(&self, key: I::Key) -> Option<Arc<I>> { pub fn search(&self, key: I::Key) -> Option<&I> {
// Find the greatest point that precedes or is equal to the search key. If there is // Find the greatest point that precedes or is equal to the search key. If there is
// none, returns None. // none, returns None.
let (_, p) = self.points.range(..=key).next_back()?; let (_, p) = self.points.range(..=key).next_back()?;
@@ -100,7 +99,7 @@ where
} }
}) })
.unwrap(); .unwrap();
Some(Arc::clone(highest_item)) Some(highest_item)
} }
/// Iterate over all items with start bound >= 'key' /// Iterate over all items with start bound >= 'key'
@@ -119,7 +118,7 @@ where
} }
} }
pub fn insert(&mut self, item: Arc<I>) { pub fn insert(&mut self, item: &I) {
let start_key = item.start_key(); let start_key = item.start_key();
let end_key = item.end_key(); let end_key = item.end_key();
assert!(start_key < end_key); assert!(start_key < end_key);
@@ -133,18 +132,18 @@ where
found_start_point = true; found_start_point = true;
// It is an error to insert the same item to the tree twice. // It is an error to insert the same item to the tree twice.
assert!( assert!(
!point.elements.iter().any(|x| Arc::ptr_eq(x, &item)), !point.elements.iter().any(|x| x == item),
"interval is already in the tree" "interval is already in the tree"
); );
} }
point.elements.push(Arc::clone(&item)); point.elements.push(item.clone());
} }
if !found_start_point { if !found_start_point {
// Create a new Point for the starting point // Create a new Point for the starting point
// Look at the previous point, and copy over elements that overlap with this // Look at the previous point, and copy over elements that overlap with this
// new point // new point
let mut new_elements: Vec<Arc<I>> = Vec::new(); let mut new_elements: Vec<I> = Vec::new();
if let Some((_, prev_point)) = self.points.range(..start_key).next_back() { if let Some((_, prev_point)) = self.points.range(..start_key).next_back() {
let overlapping_prev_elements = prev_point let overlapping_prev_elements = prev_point
.elements .elements
@@ -154,7 +153,7 @@ where
new_elements.extend(overlapping_prev_elements); new_elements.extend(overlapping_prev_elements);
} }
new_elements.push(item); new_elements.push(item.clone());
let new_point = Point { let new_point = Point {
elements: new_elements, elements: new_elements,
@@ -163,7 +162,7 @@ where
} }
} }
pub fn remove(&mut self, item: &Arc<I>) { pub fn remove(&mut self, item: &I) {
// range search points // range search points
let start_key = item.start_key(); let start_key = item.start_key();
let end_key = item.end_key(); let end_key = item.end_key();
@@ -176,7 +175,7 @@ where
found_start_point = true; found_start_point = true;
} }
let len_before = point.elements.len(); let len_before = point.elements.len();
point.elements.retain(|other| !Arc::ptr_eq(other, item)); point.elements.retain(|other| other != item);
let len_after = point.elements.len(); let len_after = point.elements.len();
assert_eq!(len_after + 1, len_before); assert_eq!(len_after + 1, len_before);
if len_after == 0 { if len_after == 0 {
@@ -191,19 +190,19 @@ where
} }
} }
pub struct IntervalIter<'a, I: ?Sized> pub struct IntervalIter<'a, I>
where where
I: IntervalItem, I: IntervalItem,
{ {
point_iter: std::collections::btree_map::Range<'a, I::Key, Point<I>>, point_iter: std::collections::btree_map::Range<'a, I::Key, Point<I>>,
elem_iter: Option<(I::Key, std::slice::Iter<'a, Arc<I>>)>, elem_iter: Option<(I::Key, std::slice::Iter<'a, I>)>,
} }
impl<'a, I> Iterator for IntervalIter<'a, I> impl<'a, I> Iterator for IntervalIter<'a, I>
where where
I: IntervalItem + ?Sized, I: IntervalItem,
{ {
type Item = Arc<I>; type Item = &'a I;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
// Iterate over all elements in all the points in 'point_iter'. To avoid // Iterate over all elements in all the points in 'point_iter'. To avoid
@@ -214,7 +213,7 @@ where
if let Some((point_key, elem_iter)) = &mut self.elem_iter { if let Some((point_key, elem_iter)) = &mut self.elem_iter {
for elem in elem_iter { for elem in elem_iter {
if elem.start_key() == *point_key { if elem.start_key() == *point_key {
return Some(Arc::clone(elem)); return Some(elem);
} }
} }
} }
@@ -230,7 +229,7 @@ where
} }
} }
impl<I: ?Sized> Default for IntervalTree<I> impl<I> Default for IntervalTree<I>
where where
I: IntervalItem, I: IntervalItem,
{ {
@@ -246,7 +245,7 @@ mod tests {
use super::*; use super::*;
use std::fmt; use std::fmt;
#[derive(Debug)] #[derive(Debug, Clone, PartialEq)]
struct MockItem { struct MockItem {
start_key: u32, start_key: u32,
end_key: u32, end_key: u32,
@@ -288,7 +287,7 @@ mod tests {
tree: &IntervalTree<MockItem>, tree: &IntervalTree<MockItem>,
key: u32, key: u32,
expected: &[&str], expected: &[&str],
) -> Option<Arc<MockItem>> { ) -> Option<MockItem> {
if let Some(v) = tree.search(key) { if let Some(v) = tree.search(key) {
let vstr = v.to_string(); let vstr = v.to_string();
@@ -299,7 +298,7 @@ mod tests {
key, v, expected, key, v, expected,
); );
Some(v) Some(v.clone())
} else { } else {
assert!( assert!(
expected.is_empty(), expected.is_empty(),
@@ -331,12 +330,12 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default(); let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Simple, non-overlapping ranges. // Simple, non-overlapping ranges.
tree.insert(Arc::new(MockItem::new(10, 11))); tree.insert(&MockItem::new(10, 11));
tree.insert(Arc::new(MockItem::new(11, 12))); tree.insert(&MockItem::new(11, 12));
tree.insert(Arc::new(MockItem::new(12, 13))); tree.insert(&MockItem::new(12, 13));
tree.insert(Arc::new(MockItem::new(18, 19))); tree.insert(&MockItem::new(18, 19));
tree.insert(Arc::new(MockItem::new(17, 18))); tree.insert(&MockItem::new(17, 18));
tree.insert(Arc::new(MockItem::new(15, 16))); tree.insert(&MockItem::new(15, 16));
assert_search(&tree, 9, &[]); assert_search(&tree, 9, &[]);
assert_search(&tree, 10, &["10-11"]); assert_search(&tree, 10, &["10-11"]);
@@ -370,13 +369,13 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default(); let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Overlapping items // Overlapping items
tree.insert(Arc::new(MockItem::new(22, 24))); tree.insert(&MockItem::new(22, 24));
tree.insert(Arc::new(MockItem::new(23, 25))); tree.insert(&MockItem::new(23, 25));
let x24_26 = Arc::new(MockItem::new(24, 26)); let x24_26 = MockItem::new(24, 26);
tree.insert(Arc::clone(&x24_26)); tree.insert(&x24_26);
let x26_28 = Arc::new(MockItem::new(26, 28)); let x26_28 = MockItem::new(26, 28);
tree.insert(Arc::clone(&x26_28)); tree.insert(&x26_28);
tree.insert(Arc::new(MockItem::new(25, 27))); tree.insert(&MockItem::new(25, 27));
assert_search(&tree, 22, &["22-24"]); assert_search(&tree, 22, &["22-24"]);
assert_search(&tree, 23, &["22-24", "23-25"]); assert_search(&tree, 23, &["22-24", "23-25"]);
@@ -403,10 +402,10 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default(); let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Items containing other items // Items containing other items
tree.insert(Arc::new(MockItem::new(31, 39))); tree.insert(&MockItem::new(31, 39));
tree.insert(Arc::new(MockItem::new(32, 34))); tree.insert(&MockItem::new(32, 34));
tree.insert(Arc::new(MockItem::new(33, 35))); tree.insert(&MockItem::new(33, 35));
tree.insert(Arc::new(MockItem::new(30, 40))); tree.insert(&MockItem::new(30, 40));
assert_search(&tree, 30, &["30-40"]); assert_search(&tree, 30, &["30-40"]);
assert_search(&tree, 31, &["30-40", "31-39"]); assert_search(&tree, 31, &["30-40", "31-39"]);
@@ -427,16 +426,16 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default(); let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Duplicate keys // Duplicate keys
let item_a = Arc::new(MockItem::new_str(55, 56, "a")); let item_a = MockItem::new_str(55, 56, "a");
tree.insert(Arc::clone(&item_a)); tree.insert(&item_a);
let item_b = Arc::new(MockItem::new_str(55, 56, "b")); let item_b = MockItem::new_str(55, 56, "b");
tree.insert(Arc::clone(&item_b)); tree.insert(&item_b);
let item_c = Arc::new(MockItem::new_str(55, 56, "c")); let item_c = MockItem::new_str(55, 56, "c");
tree.insert(Arc::clone(&item_c)); tree.insert(&item_c);
let item_d = Arc::new(MockItem::new_str(54, 56, "d")); let item_d = MockItem::new_str(54, 56, "d");
tree.insert(Arc::clone(&item_d)); tree.insert(&item_d);
let item_e = Arc::new(MockItem::new_str(55, 57, "e")); let item_e = MockItem::new_str(55, 57, "e");
tree.insert(Arc::clone(&item_e)); tree.insert(&item_e);
dump_tree(&tree); dump_tree(&tree);
@@ -461,8 +460,8 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default(); let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Inserting the same item twice is not cool // Inserting the same item twice is not cool
let item = Arc::new(MockItem::new(1, 2)); let item = MockItem::new(1, 2);
tree.insert(Arc::clone(&item)); tree.insert(&item);
tree.insert(Arc::clone(&item)); // fails assertion tree.insert(&item); // fails assertion
} }
} }

View File

@@ -0,0 +1,129 @@
use crate::tenant_mgr;
use crate::layered_repository::layer_map;
use crate::PageServerConf;
use anyhow::Result;
use lazy_static::lazy_static;
use tracing::*;
use std::collections::VecDeque;
use std::sync::Mutex;
use std::thread::JoinHandle;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
static ref JOB_QUEUE: Mutex<GlobalJobQueue> = Mutex::new(GlobalJobQueue::default());
}
#[derive(Default)]
struct GlobalJobQueue {
jobs: VecDeque<GlobalJob>,
}
pub enum GlobalJob {
// To release memory
EvictSomeLayer,
// To advance 'disk_consistent_lsn'
CheckpointTimeline(ZTenantId, ZTimelineId),
// To free up disk space
GarbageCollect(ZTenantId),
}
pub fn schedule_job(job: GlobalJob) {
let mut queue = JOB_QUEUE.lock().unwrap();
queue.jobs.push_back(job);
}
///
/// Launch the global job handler thread
///
/// TODO: This ought to be a pool of threads
///
pub fn launch_global_job_thread(conf: &'static PageServerConf) -> JoinHandle<()> {
std::thread::Builder::new()
.name("Global Job thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
global_job_loop(conf).expect("Global job thread died");
})
.unwrap()
}
pub fn global_job_loop(conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.checkpoint_period);
info!("global job thread waking up");
let mut queue = JOB_QUEUE.lock().unwrap();
while let Some(job) = queue.jobs.pop_front() {
drop(queue);
let result = match job {
GlobalJob::EvictSomeLayer => {
evict_layer()
},
GlobalJob::CheckpointTimeline(tenantid, timelineid) => {
checkpoint_timeline(tenantid, timelineid)
}
GlobalJob::GarbageCollect(tenantid) => {
gc_tenant(tenantid)
}
};
if let Err(err) = result {
error!("job ended in error: {:#}", err);
}
queue = JOB_QUEUE.lock().unwrap();
}
}
trace!("Checkpointer thread shut down");
Ok(())
}
// Freeze and write out an in-memory layer
fn evict_layer() -> Result<()>
{
// Pick a victim
while let Some(layer_id) = layer_map::find_victim() {
let victim_layer = match layer_map::get_layer(layer_id) {
Some(l) => l,
None => continue,
};
let tenantid = victim_layer.get_tenant_id();
let timelineid = victim_layer.get_timeline_id();
let _entered = info_span!("global evict", timeline = %timelineid, tenant = %tenantid)
.entered();
info!("evicting {}", victim_layer.filename().display());
drop(victim_layer);
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline.upgrade_to_layered_timeline().evict_layer(layer_id)?
}
info!("no more eviction needed");
Ok(())
}
fn checkpoint_timeline(tenantid: ZTenantId, timelineid: ZTimelineId) -> Result<()> {
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline.checkpoint_scheduled()
}
fn gc_tenant(tenantid: ZTenantId) -> Result<()> {
let tenant = tenant_mgr::get_repository_for_tenant(tenantid)?;
tenant.gc_scheduled()?;
Ok(())
}

View File

@@ -9,6 +9,23 @@
//! new image and delta layers and corresponding files are written to disk. //! new image and delta layers and corresponding files are written to disk.
//! //!
//
// Global layer registry:
//
// Every layer is inserted into the global registry, and assigned an ID
//
// The global registry tracks memory usage and usage count for each layer
//
//
// In addition to that, there is a per-timeline LayerMap, used for lookups
//
//
use crate::layered_repository::{schedule_job, GlobalJob};
use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree}; use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree};
use crate::layered_repository::storage_layer::{Layer, SegmentTag}; use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::layered_repository::InMemoryLayer; use crate::layered_repository::InMemoryLayer;
@@ -17,7 +34,8 @@ use anyhow::Result;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap}; use std::collections::{BinaryHeap, HashMap};
use std::sync::Arc; use std::sync::{Arc, Mutex};
use tracing::*;
use zenith_metrics::{register_int_gauge, IntGauge}; use zenith_metrics::{register_int_gauge, IntGauge};
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
@@ -28,6 +46,176 @@ lazy_static! {
static ref NUM_ONDISK_LAYERS: IntGauge = static ref NUM_ONDISK_LAYERS: IntGauge =
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk") register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric"); .expect("failed to define a metric");
// Global layer map
static ref LAYERS: Mutex<GlobalLayerMap> = Mutex::new(GlobalLayerMap::new());
}
const MAX_OPEN_LAYERS: usize = 10;
const MAX_LOADED_LAYERS: usize = 100;
struct GlobalLayerEntry {
tag: u64, // to fix ABA problem
layer: Option<Arc<dyn Layer>>,
usage_count: u32,
}
struct GlobalLayerMap {
open_layers: Vec<GlobalLayerEntry>,
clock_arm: u32,
num_open_layers: usize,
eviction_scheduled: bool,
historic_layers: Vec<GlobalLayerEntry>,
}
impl GlobalLayerMap {
pub fn new() -> GlobalLayerMap {
GlobalLayerMap {
open_layers: Vec::new(),
clock_arm: 0,
historic_layers: Vec::new(),
eviction_scheduled: false,
num_open_layers: 0,
}
}
pub fn get(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
let e = if layer_id.is_historic() {
let idx = (layer_id.index - 1) as usize;
&mut self.historic_layers[idx]
} else {
let idx = ((-layer_id.index) - 1) as usize;
&mut self.open_layers[idx]
};
if e.usage_count < 5 {
e.usage_count += 1;
}
e.layer.clone()
}
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
let index = -(self.open_layers.len() as isize + 1);
let entry = GlobalLayerEntry {
layer: Some(layer),
usage_count: 1,
tag: 1,
};
let tag = entry.tag;
self.open_layers.push(entry);
self.num_open_layers += 1;
NUM_INMEMORY_LAYERS.inc();
if !self.eviction_scheduled && self.num_open_layers >= MAX_OPEN_LAYERS {
info!("scheduling global eviction");
schedule_job(GlobalJob::EvictSomeLayer);
self.eviction_scheduled = true;
}
LayerId {
index,
tag,
}
}
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
let index = self.historic_layers.len() as isize + 1;
let entry = GlobalLayerEntry {
layer: Some(layer),
usage_count: 1,
tag: 1,
};
let tag = entry.tag;
self.historic_layers.push(entry);
NUM_ONDISK_LAYERS.inc();
LayerId {
index,
tag,
}
}
pub fn remove(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
let old_layer;
if layer_id.is_historic() {
let idx = (layer_id.index - 1) as usize;
old_layer = self.historic_layers[idx].layer.take();
if old_layer.is_some() {
NUM_ONDISK_LAYERS.dec();
}
} else {
let idx = ((-layer_id.index) - 1) as usize;
old_layer = self.open_layers[idx].layer.take();
if old_layer.is_some() {
NUM_INMEMORY_LAYERS.dec();
self.num_open_layers -= 1;
}
}
old_layer
}
pub fn find_victim(&mut self) -> Option<LayerId> {
// run the clock algorithm among all open layers
for _ in 0..self.open_layers.len() * 5 {
self.clock_arm += 1;
if self.clock_arm >= self.open_layers.len() as u32 {
self.clock_arm = 0;
}
let next = self.clock_arm as usize;
if self.open_layers[next].usage_count == 0 {
return Some(LayerId {
index: -((next + 1) as isize),
tag: self.open_layers[next].tag,
});
} else {
self.open_layers[next].usage_count -= 1;
}
}
None
}
}
pub fn find_victim() -> Option<LayerId> {
let mut l = LAYERS.lock().unwrap();
if l.num_open_layers >= MAX_OPEN_LAYERS {
if let Some(x) = l.find_victim() {
info!("found victim out of {} open layers", l.num_open_layers);
Some(x)
} else {
info!("no victim found at {} open layers", l.num_open_layers);
None
}
} else {
info!("no victim needed at {} open layers", l.num_open_layers);
l.eviction_scheduled = false;
None
}
}
pub fn get_layer(layer_id: LayerId) -> Option<Arc<dyn Layer>> {
LAYERS.lock().unwrap().get(layer_id)
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct LayerId {
index: isize,
tag: u64
}
impl LayerId {
pub fn is_historic(&self) -> bool {
self.index > 0
}
} }
/// ///
@@ -41,7 +229,7 @@ pub struct LayerMap {
/// All in-memory layers, ordered by 'oldest_pending_lsn' and generation /// All in-memory layers, ordered by 'oldest_pending_lsn' and generation
/// of each layer. This allows easy access to the in-memory layer that /// of each layer. This allows easy access to the in-memory layer that
/// contains the oldest WAL record. /// contains the oldest WAL record.
open_layers: BinaryHeap<OpenLayerEntry>, open_layers: BinaryHeap<OpenLayerHeapEntry>,
/// Generation number, used to distinguish newly inserted entries in the /// Generation number, used to distinguish newly inserted entries in the
/// binary heap from older entries during checkpoint. /// binary heap from older entries during checkpoint.
@@ -61,23 +249,32 @@ impl LayerMap {
segentry.get(lsn) segentry.get(lsn)
} }
pub fn get_with_id(&self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
// TODO: check that it belongs to this tenant+timeline
LAYERS.lock().unwrap().get(layer_id)
}
/// ///
/// Get the open layer for given segment for writing. Or None if no open /// Get the open layer for given segment for writing. Or None if no open
/// layer exists. /// layer exists.
/// ///
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> { pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<dyn Layer>> {
let segentry = self.segs.get(tag)?; let segentry = self.segs.get(tag)?;
segentry.open.as_ref().map(Arc::clone) let (layer_id, _start_lsn) = segentry.open?;
LAYERS.lock().unwrap().get(layer_id)
} }
/// ///
/// Insert an open in-memory layer /// Insert an open in-memory layer
/// ///
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) { pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
let layer_id = LAYERS.lock().unwrap().insert_open(Arc::clone(&layer));
let segentry = self.segs.entry(layer.get_seg_tag()).or_default(); let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.update_open(Arc::clone(&layer)); segentry.update_open(layer_id, layer.get_start_lsn());
let oldest_pending_lsn = layer.get_oldest_pending_lsn(); let oldest_pending_lsn = layer.get_oldest_pending_lsn();
@@ -87,58 +284,52 @@ impl LayerMap {
assert!(oldest_pending_lsn.is_aligned()); assert!(oldest_pending_lsn.is_aligned());
// Also add it to the binary heap // Also add it to the binary heap
let open_layer_entry = OpenLayerEntry { let open_layer_entry = OpenLayerHeapEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(), oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer, layer_id,
generation: self.current_generation, generation: self.current_generation,
}; };
self.open_layers.push(open_layer_entry); self.open_layers.push(open_layer_entry);
NUM_INMEMORY_LAYERS.inc();
} }
/// Remove the oldest in-memory layer /// Remove a layer
pub fn pop_oldest_open(&mut self) { pub fn remove(&mut self, layer_id: LayerId) {
// Pop it from the binary heap if let Some(layer) = LAYERS.lock().unwrap().remove(layer_id) {
let oldest_entry = self.open_layers.pop().unwrap(); // Also remove it from the SegEntry of this segment
let segtag = oldest_entry.layer.get_seg_tag(); if layer_id.is_historic() {
let tag = layer.get_seg_tag();
// Also remove it from the SegEntry of this segment if let Some(segentry) = self.segs.get_mut(&tag) {
let mut segentry = self.segs.get_mut(&segtag).unwrap(); segentry.historic.remove(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) { }
segentry.open = None; } else {
} else { let segtag = layer.get_seg_tag();
// We could have already updated segentry.open for let mut segentry = self.segs.get_mut(&segtag).unwrap();
// dropped (non-writeable) layer. This is fine. if let Some(open) = segentry.open {
assert!(!oldest_entry.layer.is_writeable()); if open.0 == layer_id {
assert!(oldest_entry.layer.is_dropped()); segentry.open = None;
}
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
//assert!(!layer.is_writeable());
//assert!(layer.is_dropped());
}
}
} }
NUM_INMEMORY_LAYERS.dec();
} }
/// ///
/// Insert an on-disk layer /// Insert an on-disk layer
/// ///
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) { pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
let layer_id = LAYERS.lock().unwrap().insert_historic(Arc::clone(&layer));
let segentry = self.segs.entry(layer.get_seg_tag()).or_default(); let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.insert_historic(layer); segentry.insert_historic(layer_id, layer);
NUM_ONDISK_LAYERS.inc(); layer_id
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
let tag = layer.get_seg_tag();
if let Some(segentry) = self.segs.get_mut(&tag) {
segentry.historic.remove(&layer);
}
NUM_ONDISK_LAYERS.dec();
} }
// List relations along with a flag that marks if they exist at the given lsn. // List relations along with a flag that marks if they exist at the given lsn.
@@ -199,10 +390,18 @@ impl LayerMap {
} }
/// Return the oldest in-memory layer, along with its generation number. /// Return the oldest in-memory layer, along with its generation number.
pub fn peek_oldest_open(&self) -> Option<(Arc<InMemoryLayer>, u64)> { pub fn peek_oldest_open(&mut self) -> Option<(LayerId, Arc<dyn Layer>, u64)> {
self.open_layers
.peek() while let Some(oldest_entry) = self.open_layers.peek() {
.map(|oldest_entry| (Arc::clone(&oldest_entry.layer), oldest_entry.generation)) if let Some(layer) = LAYERS.lock().unwrap().get(oldest_entry.layer_id) {
return Some((oldest_entry.layer_id,
layer,
oldest_entry.generation));
} else {
self.open_layers.pop();
}
}
None
} }
/// Increment the generation number used to stamp open in-memory layers. Layers /// Increment the generation number used to stamp open in-memory layers. Layers
@@ -220,6 +419,7 @@ impl LayerMap {
} }
} }
/*
/// debugging function to print out the contents of the layer map /// debugging function to print out the contents of the layer map
#[allow(unused)] #[allow(unused)]
pub fn dump(&self) -> Result<()> { pub fn dump(&self) -> Result<()> {
@@ -236,16 +436,39 @@ impl LayerMap {
println!("End dump LayerMap"); println!("End dump LayerMap");
Ok(()) Ok(())
} }
*/
} }
impl IntervalItem for dyn Layer { #[derive(Clone)]
struct HistoricLayerIntervalTreeEntry {
layer_id: LayerId,
start_lsn: Lsn,
end_lsn: Lsn,
}
impl HistoricLayerIntervalTreeEntry {
fn new(layer_id: LayerId, layer: Arc<dyn Layer>) -> HistoricLayerIntervalTreeEntry{
HistoricLayerIntervalTreeEntry {
layer_id,
start_lsn: layer.get_start_lsn(),
end_lsn: layer.get_end_lsn(),
}
}
}
impl PartialEq for HistoricLayerIntervalTreeEntry {
fn eq(&self, other: &Self) -> bool {
self.layer_id == other.layer_id
}
}
impl IntervalItem for HistoricLayerIntervalTreeEntry {
type Key = Lsn; type Key = Lsn;
fn start_key(&self) -> Lsn { fn start_key(&self) -> Lsn {
self.get_start_lsn() self.start_lsn
} }
fn end_key(&self) -> Lsn { fn end_key(&self) -> Lsn {
self.get_end_lsn() self.end_lsn
} }
} }
@@ -259,8 +482,8 @@ impl IntervalItem for dyn Layer {
/// IntervalTree. /// IntervalTree.
#[derive(Default)] #[derive(Default)]
struct SegEntry { struct SegEntry {
open: Option<Arc<InMemoryLayer>>, open: Option<(LayerId, Lsn)>,
historic: IntervalTree<dyn Layer>, historic: IntervalTree<HistoricLayerIntervalTreeEntry>,
} }
impl SegEntry { impl SegEntry {
@@ -276,13 +499,18 @@ impl SegEntry {
pub fn get(&self, lsn: Lsn) -> Option<Arc<dyn Layer>> { pub fn get(&self, lsn: Lsn) -> Option<Arc<dyn Layer>> {
if let Some(open) = &self.open { if let Some(open) = &self.open {
if open.get_start_lsn() <= lsn { if let Some(layer) = LAYERS.lock().unwrap().get(open.0) {
let x: Arc<dyn Layer> = Arc::clone(open) as _; if layer.get_start_lsn() <= lsn {
return Some(x); return Some(layer);
}
} }
} }
self.historic.search(lsn) if let Some(historic) = self.historic.search(lsn) {
Some(LAYERS.lock().unwrap().get(historic.layer_id).unwrap())
} else {
None
}
} }
pub fn newer_image_layer_exists(&self, lsn: Lsn) -> bool { pub fn newer_image_layer_exists(&self, lsn: Lsn) -> bool {
@@ -291,21 +519,25 @@ impl SegEntry {
self.historic self.historic
.iter_newer(lsn) .iter_newer(lsn)
.any(|layer| !layer.is_incremental()) .any(|e| {
let layer = LAYERS.lock().unwrap().get(e.layer_id).unwrap();
!layer.is_incremental()
}
)
} }
// Set new open layer for a SegEntry. // Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer, // It's ok to rewrite previous open layer,
// but only if it is not writeable anymore. // but only if it is not writeable anymore.
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) { pub fn update_open(&mut self, layer_id: LayerId, start_lsn: Lsn) {
if let Some(prev_open) = &self.open { if let Some(_prev_open) = &self.open {
assert!(!prev_open.is_writeable()); //assert!(!prev_open.is_writeable());
} }
self.open = Some(layer); self.open = Some((layer_id, start_lsn));
} }
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) { pub fn insert_historic(&mut self, layer_id: LayerId, layer: Arc<dyn Layer>) {
self.historic.insert(layer); self.historic.insert(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
} }
} }
@@ -315,12 +547,12 @@ impl SegEntry {
/// The generation number associated with each entry can be used to distinguish /// The generation number associated with each entry can be used to distinguish
/// recently-added entries (i.e after last call to increment_generation()) from older /// recently-added entries (i.e after last call to increment_generation()) from older
/// entries with the same 'oldest_pending_lsn'. /// entries with the same 'oldest_pending_lsn'.
struct OpenLayerEntry { struct OpenLayerHeapEntry {
pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn() pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
pub generation: u64, pub generation: u64,
pub layer: Arc<InMemoryLayer>, pub layer_id: LayerId,
} }
impl Ord for OpenLayerEntry { impl Ord for OpenLayerHeapEntry {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, other: &Self) -> Ordering {
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that. Entries with identical oldest_pending_lsn are ordered by generation // to get that. Entries with identical oldest_pending_lsn are ordered by generation
@@ -330,32 +562,33 @@ impl Ord for OpenLayerEntry {
.then_with(|| other.generation.cmp(&self.generation)) .then_with(|| other.generation.cmp(&self.generation))
} }
} }
impl PartialOrd for OpenLayerEntry { impl PartialOrd for OpenLayerHeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other)) Some(self.cmp(other))
} }
} }
impl PartialEq for OpenLayerEntry { impl PartialEq for OpenLayerHeapEntry {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal self.cmp(other) == Ordering::Equal
} }
} }
impl Eq for OpenLayerEntry {} impl Eq for OpenLayerHeapEntry {}
/// Iterator returned by LayerMap::iter_historic_layers() /// Iterator returned by LayerMap::iter_historic_layers()
pub struct HistoricLayerIter<'a> { pub struct HistoricLayerIter<'a> {
seg_iter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, seg_iter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
iter: Option<IntervalIter<'a, dyn Layer>>, iter: Option<IntervalIter<'a, HistoricLayerIntervalTreeEntry>>,
} }
impl<'a> Iterator for HistoricLayerIter<'a> { impl<'a> Iterator for HistoricLayerIter<'a> {
type Item = Arc<dyn Layer>; type Item = (LayerId, Arc<dyn Layer>);
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> { fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
loop { loop {
if let Some(x) = &mut self.iter { if let Some(x) = &mut self.iter {
if let Some(x) = x.next() { if let Some(x) = x.next() {
return Some(Arc::clone(&x)); let layer = LAYERS.lock().unwrap().get(x.layer_id).unwrap();
return Some((x.layer_id, layer));
} }
} }
if let Some((_tag, segentry)) = self.seg_iter.next() { if let Some((_tag, segentry)) = self.seg_iter.next() {
@@ -426,10 +659,10 @@ mod tests {
// A helper function (closure) to pop the next oldest open entry from the layer map, // A helper function (closure) to pop the next oldest open entry from the layer map,
// and assert that it is what we'd expect // and assert that it is what we'd expect
let mut assert_pop_layer = |expected_segno: u32, expected_generation: u64| { let mut assert_pop_layer = |expected_segno: u32, expected_generation: u64| {
let (l, generation) = layers.peek_oldest_open().unwrap(); let (layer_id, l, generation) = layers.peek_oldest_open().unwrap();
assert!(l.get_seg_tag().segno == expected_segno); assert!(l.get_seg_tag().segno == expected_segno);
assert!(generation == expected_generation); assert!(generation == expected_generation);
layers.pop_oldest_open(); layers.remove(layer_id);
}; };
assert_pop_layer(0, gen1); // 0x100 assert_pop_layer(0, gen1); // 0x100

View File

@@ -2,9 +2,10 @@
//! Common traits and structs for layers //! Common traits and structs for layers
//! //!
use crate::layered_repository::InMemoryLayer;
use crate::relish::RelishTag; use crate::relish::RelishTag;
use crate::repository::WALRecord; use crate::repository::WALRecord;
use crate::ZTimelineId; use crate::{ZTenantId, ZTimelineId};
use anyhow::Result; use anyhow::Result;
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -104,6 +105,14 @@ pub enum PageReconstructResult {
/// in-memory and on-disk layers. /// in-memory and on-disk layers.
/// ///
pub trait Layer: Send + Sync { pub trait Layer: Send + Sync {
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
None
}
/// Identify the timeline this relish belongs to
fn get_tenant_id(&self) -> ZTenantId;
/// Identify the timeline this relish belongs to /// Identify the timeline this relish belongs to
fn get_timeline_id(&self) -> ZTimelineId; fn get_timeline_id(&self) -> ZTimelineId;

View File

@@ -21,6 +21,7 @@ pub mod tenant_mgr;
pub mod waldecoder; pub mod waldecoder;
pub mod walreceiver; pub mod walreceiver;
pub mod walredo; pub mod walredo;
pub mod vfd;
pub mod defaults { pub mod defaults {
use const_format::formatcp; use const_format::formatcp;

View File

@@ -690,7 +690,7 @@ impl postgres_backend::Handler for PageServerHandler {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; let result = repo.gc_manual(Some(timelineid), gc_horizon, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[ pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layer_relfiles_total"), RowDescriptor::int8_col(b"layer_relfiles_total"),

View File

@@ -33,12 +33,16 @@ pub trait Repository: Send + Sync {
/// `checkpoint_before_gc` parameter is used to force compaction of storage before CG /// `checkpoint_before_gc` parameter is used to force compaction of storage before CG
/// to make tests more deterministic. /// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed? /// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
fn gc_iteration( fn gc_manual(
&self, &self,
timelineid: Option<ZTimelineId>, timelineid: Option<ZTimelineId>,
horizon: u64, horizon: u64,
checkpoint_before_gc: bool, checkpoint_before_gc: bool,
) -> Result<GcResult>; ) -> Result<GcResult>;
fn gc_scheduled(&self) -> Result<GcResult>;
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository;
} }
/// ///
@@ -144,7 +148,9 @@ pub trait Timeline: Send + Sync {
/// ///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository. /// know anything about them here in the repository.
fn checkpoint(&self) -> Result<()>; fn checkpoint_forced(&self) -> Result<()>;
fn checkpoint_scheduled(&self) -> Result<()>;
/// Retrieve current logical size of the timeline /// Retrieve current logical size of the timeline
/// ///
@@ -155,6 +161,8 @@ pub trait Timeline: Send + Sync {
/// Does the same as get_current_logical_size but counted on demand. /// Does the same as get_current_logical_size but counted on demand.
/// Used in tests to ensure thet incremental and non incremental variants match. /// Used in tests to ensure thet incremental and non incremental variants match.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>; fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline;
} }
/// Various functions to mutate the timeline. /// Various functions to mutate the timeline.
@@ -714,8 +722,8 @@ mod tests {
.contains(&TESTREL_A)); .contains(&TESTREL_A));
// Run checkpoint and garbage collection and check that it's still not visible // Run checkpoint and garbage collection and check that it's still not visible
newtline.checkpoint()?; newtline.checkpoint_forced()?;
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?; repo.gc_manual(Some(NEW_TIMELINE_ID), 0, true)?;
assert!(!newtline assert!(!newtline
.list_rels(0, TESTDB, Lsn(0x40))? .list_rels(0, TESTDB, Lsn(0x40))?

View File

@@ -106,17 +106,6 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
true, true,
)); ));
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
};
handles.insert(tenant_id, h);
let mut m = access_tenants(); let mut m = access_tenants();
let tenant = m.get_mut(&tenant_id).unwrap(); let tenant = m.get_mut(&tenant_id).unwrap();
tenant.repo = Some(repo); tenant.repo = Some(repo);

155
pageserver/src/vfd.rs Normal file
View File

@@ -0,0 +1,155 @@
use std::fs::File;
use std::io::Seek;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use lazy_static::lazy_static;
const INVALID_TAG: u64 = u64::MAX;
struct OpenFiles {
next: usize,
files: Vec<OpenFile>,
}
lazy_static! {
static ref OPEN_FILES: Mutex<OpenFiles> = Mutex::new(OpenFiles {
next: 0,
files: Vec::new(),
});
}
struct OpenFile {
tag: u64,
file: Option<File>,
}
pub struct VirtualFile {
vfd: usize,
tag: u64,
path: PathBuf,
}
impl VirtualFile {
pub fn new(path: &Path) -> VirtualFile {
VirtualFile {
vfd: 0,
tag: INVALID_TAG,
path: path.to_path_buf(),
}
}
pub fn open(&mut self) -> std::io::Result<File> {
let mut l = OPEN_FILES.lock().unwrap();
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
if let Some(mut file) = l.files[self.vfd].file.take() {
// return cached File
file.rewind()?;
return Ok(file);
}
}
File::open(&self.path)
}
pub fn cache(&mut self, file: File) {
let mut l = OPEN_FILES.lock().unwrap();
let next = if l.next >= l.files.len() {
if l.files.len() < 100 {
l.files.push(OpenFile {
tag: 0,
file: None
});
l.files.len() - 1
} else {
// wrap around
0
}
} else {
l.next
};
l.next = next + 1;
l.files[next].file.replace(file);
l.files[next].tag += 1;
self.vfd = next;
self.tag = l.files[next].tag;
drop(l);
}
}
impl Drop for VirtualFile {
fn drop(&mut self) {
// Close file if it's still open
if self.tag != INVALID_TAG {
let mut l = OPEN_FILES.lock().unwrap();
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
l.files[self.vfd].file.take();
}
}
}
}
#[cfg(test)]
mod tests {
use crate::PageServerConf;
use super::*;
use std::io::Read;
#[test]
fn test_vfd() -> anyhow::Result<()> {
let mut vfiles = Vec::new();
let test_dir = PageServerConf::test_repo_dir("test_vfd");
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir)?;
for i in 0..2000 {
let path = test_dir.join(format!("vfd_test{}", i));
let content = format!("foobar{}", i);
std::fs::write(&path, &content)?;
let vfile = VirtualFile::new(&path);
vfiles.push((vfile, path, content));
}
for i in 0..vfiles.len() {
let (ref mut vfile, _path, expected_content) = &mut vfiles[i];
let mut s = String::new();
let mut file = vfile.open()?;
file.read_to_string(&mut s)?;
assert!(&s == expected_content);
vfile.cache(file);
s.clear();
let (ref mut vfile, _path, expected_content) = &mut vfiles[0];
let mut file = vfile.open()?;
file.read_to_string(&mut s)?;
assert!(&s == expected_content);
vfile.cache(file);
}
Ok(())
}
}

View File

@@ -251,6 +251,9 @@ fn walreceiver_main(
last_rec_lsn = lsn; last_rec_lsn = lsn;
} }
timeline.upgrade_to_layered_timeline().schedule_checkpoint_if_needed()?;
timeline.upgrade_to_layered_timeline().schedule_gc_if_needed()?;
if !caught_up && endlsn >= end_of_wal { if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {}", endlsn); info!("caught up at LSN {}", endlsn);
caught_up = true; caught_up = true;

View File

@@ -28,19 +28,21 @@ use std::fs::OpenOptions;
use std::io::prelude::*; use std::io::prelude::*;
use std::io::Error; use std::io::Error;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::{ChildStdin, ChildStdout, ChildStderr, Command};
use std::process::Stdio; use std::process::Stdio;
use std::sync::Mutex; use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{ChildStdin, ChildStdout, Command};
use tokio::time::timeout;
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter}; use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
use zenith_utils::bin_ser::BeSer; use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantId; use zenith_utils::zid::ZTenantId;
use std::os::unix::io::AsRawFd;
use nix::poll::*;
use crate::relish::*; use crate::relish::*;
use crate::repository::WALRecord; use crate::repository::WALRecord;
use crate::waldecoder::XlMultiXactCreate; use crate::waldecoder::XlMultiXactCreate;
@@ -133,14 +135,14 @@ lazy_static! {
/// perform WAL replay. Only one thread can use the processs at a time, /// perform WAL replay. Only one thread can use the processs at a time,
/// that is controlled by the Mutex. In the future, we might want to /// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple /// launch a pool of processes to allow concurrent replay of multiple
/// records. /// records. FIXME we have a pool now
/// ///
pub struct PostgresRedoManager { pub struct PostgresRedoManager {
tenantid: ZTenantId, tenantid: ZTenantId,
conf: &'static PageServerConf, conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime, processes: Vec<Mutex<Option<PostgresRedoProcess>>>,
process: Mutex<Option<PostgresRedoProcess>>, next: AtomicUsize,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -210,21 +212,18 @@ impl WalRedoManager for PostgresRedoManager {
end_time = Instant::now(); end_time = Instant::now();
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64()); WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
} else { } else {
let mut process_guard = self.process.lock().unwrap(); let process_no = self.next.fetch_add(1, Ordering::AcqRel) % self.processes.len();
let mut process_guard = self.processes[process_no].lock().unwrap();
let lock_time = Instant::now(); let lock_time = Instant::now();
// launch the WAL redo process on first use // launch the WAL redo process on first use
if process_guard.is_none() { if process_guard.is_none() {
let p = self let p = PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid)?;
.runtime
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
*process_guard = Some(p); *process_guard = Some(p);
} }
let process = process_guard.as_mut().unwrap(); let process = process_guard.as_mut().unwrap();
result = self result = self.handle_apply_request_postgres(process, &request);
.runtime
.block_on(self.handle_apply_request_postgres(process, &request));
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
end_time = Instant::now(); end_time = Instant::now();
@@ -240,27 +239,24 @@ impl PostgresRedoManager {
/// Create a new PostgresRedoManager. /// Create a new PostgresRedoManager.
/// ///
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager { pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
// We block on waiting for requests on the walredo request channel, but let mut processes: Vec<Mutex<Option<PostgresRedoProcess>>> = Vec::new();
// use async I/O to communicate with the child process. Initialize the for _ in 1..10 {
// runtime for the async part. processes.push(Mutex::new(None));
let runtime = tokio::runtime::Builder::new_current_thread() }
.enable_all()
.build()
.unwrap();
// The actual process is launched lazily, on first request. // The actual process is launched lazily, on first request.
PostgresRedoManager { PostgresRedoManager {
runtime,
tenantid, tenantid,
conf, conf,
process: Mutex::new(None), processes,
next: AtomicUsize::new(0),
} }
} }
/// ///
/// Process one request for WAL redo using wal-redo postgres /// Process one request for WAL redo using wal-redo postgres
/// ///
async fn handle_apply_request_postgres( fn handle_apply_request_postgres(
&self, &self,
process: &mut PostgresRedoProcess, process: &mut PostgresRedoProcess,
request: &WalRedoRequest, request: &WalRedoRequest,
@@ -278,14 +274,14 @@ impl PostgresRedoManager {
if let RelishTag::Relation(rel) = request.rel { if let RelishTag::Relation(rel) = request.rel {
// Relational WAL records are applied using wal-redo-postgres // Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum }; let buf_tag = BufferTag { rel, blknum };
apply_result = process.apply_wal_records(buf_tag, base_img, records).await; apply_result = process.apply_wal_records(buf_tag, base_img, records);
let duration = start.elapsed(); let duration = start.elapsed();
debug!( trace!(
"postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}", "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}",
nrecords, nrecords,
duration.as_millis(), duration.as_micros(),
lsn lsn
); );
@@ -471,20 +467,22 @@ impl PostgresRedoManager {
struct PostgresRedoProcess { struct PostgresRedoProcess {
stdin: ChildStdin, stdin: ChildStdin,
stdout: ChildStdout, stdout: ChildStdout,
stderr: ChildStderr,
} }
impl PostgresRedoProcess { impl PostgresRedoProcess {
// //
// Start postgres binary in special WAL redo mode. // Start postgres binary in special WAL redo mode.
// //
async fn launch( fn launch(
conf: &PageServerConf, conf: &PageServerConf,
process_no: usize,
tenantid: &ZTenantId, tenantid: &ZTenantId,
) -> Result<PostgresRedoProcess, Error> { ) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than // just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently. // one WAL redo manager concurrently.
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir"); let datadir = conf.tenant_path(tenantid).join(format!("wal-redo-datadir-{}", process_no));
// Create empty data directory for wal-redo postgres, deleting old one first. // Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() { if datadir.exists() {
@@ -501,7 +499,6 @@ impl PostgresRedoProcess {
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.output() .output()
.await
.expect("failed to execute initdb"); .expect("failed to execute initdb");
if !initdb.status.success() { if !initdb.status.success() {
@@ -538,102 +535,106 @@ impl PostgresRedoProcess {
datadir.display() datadir.display()
); );
let stdin = child.stdin.take().expect("failed to open child's stdin"); let stdin = child.stdin.take().unwrap();
let stderr = child.stderr.take().expect("failed to open child's stderr"); let stdout = child.stdout.take().unwrap();
let stdout = child.stdout.take().expect("failed to open child's stdout"); let stderr = child.stderr.take().unwrap();
Ok(PostgresRedoProcess { stdin, stdout, stderr })
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
}
if res.unwrap() == 0 {
break;
}
error!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())
};
tokio::spawn(f_stderr);
Ok(PostgresRedoProcess { stdin, stdout })
} }
// //
// Apply given WAL records ('records') over an old page image. Returns // Apply given WAL records ('records') over an old page image. Returns
// new page image. // new page image.
// //
async fn apply_wal_records( fn apply_wal_records(
&mut self, &mut self,
tag: BufferTag, tag: BufferTag,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: &[(Lsn, WALRecord)], records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> { ) -> Result<Bytes, std::io::Error> {
let stdout = &mut self.stdout;
// Buffer the writes to avoid a lot of small syscalls.
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
// We do three things simultaneously: send the old base image and WAL records to // Send base image, if any. (If the record initializes the page, previous page
// the child process's stdin, read the result from child's stdout, and forward any logging // version is not needed.)
// information that the child writes to its stderr to the page server's log. let mut buf: Vec<u8> = Vec::new();
// build_begin_redo_for_block_msg(tag, &mut buf);
// 'f_stdin' handles writing the base image and WAL records to the child process. if let Some(img) = base_img {
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the build_push_page_msg(tag, &img, &mut buf);
// tokio runtime in the 'launch' function already, forwards the logging. }
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page // Send WAL records.
// version is not needed.) for (lsn, rec) in records.iter() {
timeout( WAL_REDO_RECORD_COUNTER.inc();
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)), build_apply_record_msg(*lsn, &rec.rec, &mut buf);
)
.await??; //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
if let Some(img) = base_img { // r.lsn >> 32, r.lsn & 0xffff_ffff);
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??; }
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
build_get_page_msg(tag, &mut buf);
// The input is now in 'buf'.
let mut nwrite = 0;
let mut resultbuf = Vec::new();
resultbuf.resize(8192, 0);
let mut nresult = 0;
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// Do a blind write first
let n = self.stdin.write(&buf[nwrite..])?;
nwrite += n;
while nresult < 8192 {
let nfds = if nwrite < buf.len() {
3
} else {
2
};
nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
//
// 'f_stdin' handles writing the base image and WAL records to the child process.
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
// tokio runtime in the 'launch' function already, forwards the logging.
if nwrite < buf.len() && !pollfds[2].revents().unwrap().is_empty() {
// stdin
let n = self.stdin.write(&buf[nwrite..])?;
nwrite += n;
} }
if !pollfds[0].revents().unwrap().is_empty() {
// stdout
// Read back new page image
let n = self.stdout.read(&mut resultbuf[nresult..])?;
// Send WAL records. nresult += n;
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
stdin
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
.await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
} }
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", if !pollfds[1].revents().unwrap().is_empty() {
// records.len(), lsn >> 32, lsn & 0xffff_ffff); // stderr
let mut readbuf: [u8; 16384] = [0; 16384];
// Send GetPage command to get the result back let n = self.stderr.read(&mut readbuf)?;
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??; error!("wal-redo-postgres: {}", String::from_utf8_lossy(&readbuf[0..n]));
}
//debug!("sent GetPage for {}", tag.blknum); //debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(()) }
};
// Read back new page image Ok(Bytes::from(Vec::from(resultbuf)))
let f_stdout = async {
let mut buf = [0u8; 8192];
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
let res = tokio::try_join!(f_stdout, f_stdin)?;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
} }
} }
@@ -641,62 +642,50 @@ impl PostgresRedoProcess {
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for // process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
// explanation of the protocol. // explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> { fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4; let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'B'); buf.put_u8(b'B');
buf.put_u32(len as u32); buf.put_u32(len as u32);
tag.ser_into(&mut buf) tag.ser_into(buf)
.expect("serialize BufferTag should always succeed"); .expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len); //debug_assert!(buf.len() == 1 + len);
buf
} }
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> { fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
assert!(base_img.len() == 8192); assert!(base_img.len() == 8192);
let len = 4 + 1 + 4 * 4 + base_img.len(); let len = 4 + 1 + 4 * 4 + base_img.len();
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'P'); buf.put_u8(b'P');
buf.put_u32(len as u32); buf.put_u32(len as u32);
tag.ser_into(&mut buf) tag.ser_into(buf)
.expect("serialize BufferTag should always succeed"); .expect("serialize BufferTag should always succeed");
buf.put(base_img); buf.put(base_img);
debug_assert!(buf.len() == 1 + len); //debug_assert!(buf.len() - oldlen == 1 + len);
buf
} }
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> { fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
let len = 4 + 8 + rec.len(); let len = 4 + 8 + rec.len();
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
buf.put_u8(b'A'); buf.put_u8(b'A');
buf.put_u32(len as u32); buf.put_u32(len as u32);
buf.put_u64(endlsn.0); buf.put_u64(endlsn.0);
buf.put(rec); buf.put(rec);
debug_assert!(buf.len() == 1 + len); //debug_assert!(buf.len() - oldlen == 1 + len);
buf
} }
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> { fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4; let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'G'); buf.put_u8(b'G');
buf.put_u32(len as u32); buf.put_u32(len as u32);
tag.ser_into(&mut buf) tag.ser_into(buf)
.expect("serialize BufferTag should always succeed"); .expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len); //debug_assert!(buf.len() == 1 + len);
buf
} }

View File

@@ -203,96 +203,38 @@ fn main() -> Result<()> {
) )
.get_matches(); .get_matches();
// Create config file let (sub_name, sub_args) = matches.subcommand();
if let ("init", Some(init_match)) = matches.subcommand() { let sub_args = sub_args.expect("no subcommand");
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
// load and parse the file // Check for 'zenith init' command first.
std::fs::read_to_string(std::path::Path::new(config_path)) let subcmd_result = if sub_name == "init" {
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))? handle_init(sub_args)
} else { } else {
// Built-in default config // all other commands need an existing config
default_conf() let env = match LocalEnv::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config: {}", e);
exit(1);
}
}; };
let mut env = LocalEnv::create_config(&toml_file) match sub_name {
.with_context(|| "Failed to create zenith configuration")?; "tenant" => handle_tenant(sub_args, &env),
env.init() "branch" => handle_branch(sub_args, &env),
.with_context(|| "Failed to initialize zenith repository")?; "start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
_ => bail!("unexpected subcommand {}", sub_name),
}
};
if let Err(e) = subcmd_result {
eprintln!("command failed: {}", e);
exit(1);
} }
// all other commands would need config
let env = match LocalEnv::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config: {}", e);
exit(1);
}
};
match matches.subcommand() {
("init", Some(_sub_m)) => {
// The options were handled above already
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.init(
// default_tenantid was generated by the `env.init()` call above
Some(&env.default_tenantid.unwrap().to_string()),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);
}
}
("tenant", Some(args)) => {
if let Err(e) = handle_tenant(args, &env) {
eprintln!("tenant command failed: {}", e);
exit(1);
}
}
("branch", Some(sub_args)) => {
if let Err(e) = handle_branch(sub_args, &env) {
eprintln!("branch command failed: {}", e);
exit(1);
}
}
("start", Some(sub_match)) => {
if let Err(e) = handle_start_all(sub_match, &env) {
eprintln!("start command failed: {}", e);
exit(1);
}
}
("stop", Some(sub_match)) => {
if let Err(e) = handle_stop_all(sub_match, &env) {
eprintln!("stop command failed: {}", e);
exit(1);
}
}
("pageserver", Some(sub_match)) => {
if let Err(e) = handle_pageserver(sub_match, &env) {
eprintln!("pageserver command failed: {}", e);
exit(1);
}
}
("pg", Some(pg_match)) => {
if let Err(e) = handle_pg(pg_match, &env) {
eprintln!("pg operation failed: {:?}", e);
exit(1);
}
}
("safekeeper", Some(sub_match)) => {
if let Err(e) = handle_safekeeper(sub_match, &env) {
eprintln!("safekeeper command failed: {}", e);
exit(1);
}
}
_ => {}
};
Ok(()) Ok(())
} }
@@ -438,6 +380,35 @@ fn get_tenantid(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<ZTe
} }
} }
fn handle_init(init_match: &ArgMatches) -> Result<()> {
// Create config file
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
// load and parse the file
std::fs::read_to_string(std::path::Path::new(config_path))
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))?
} else {
// Built-in default config
default_conf()
};
let mut env = LocalEnv::create_config(&toml_file)
.with_context(|| "Failed to create zenith configuration")?;
env.init()
.with_context(|| "Failed to initialize zenith repository")?;
// Call 'pageserver init'.
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.init(
// default_tenantid was generated by the `env.init()` call above
Some(&env.default_tenantid.unwrap().to_string()),
) {
eprintln!("pageserver init failed: {}", e);
exit(1);
}
Ok(())
}
fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env); let pageserver = PageServerNode::from_env(env);
match tenant_match.subcommand() { match tenant_match.subcommand() {
@@ -486,10 +457,11 @@ fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let mut cplane = ComputeControlPlane::load(env.clone())?; let mut cplane = ComputeControlPlane::load(env.clone())?;
match pg_match.subcommand() { // All subcommands take an optional --tenantid option
("list", Some(list_match)) => { let tenantid = get_tenantid(pg_match, env)?;
let tenantid = get_tenantid(list_match, env)?;
match pg_match.subcommand() {
("list", Some(_list_match)) => {
let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| { let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| {
eprintln!("Failed to load branch info: {}", e); eprintln!("Failed to load branch info: {}", e);
HashMap::new() HashMap::new()
@@ -520,7 +492,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
} }
} }
("create", Some(create_match)) => { ("create", Some(create_match)) => {
let tenantid = get_tenantid(create_match, env)?;
let node_name = create_match.value_of("node").unwrap_or("main"); let node_name = create_match.value_of("node").unwrap_or("main");
let timeline_name = create_match.value_of("timeline").unwrap_or(node_name); let timeline_name = create_match.value_of("timeline").unwrap_or(node_name);
@@ -531,7 +502,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
cplane.new_node(tenantid, node_name, timeline_name, port)?; cplane.new_node(tenantid, node_name, timeline_name, port)?;
} }
("start", Some(start_match)) => { ("start", Some(start_match)) => {
let tenantid = get_tenantid(start_match, env)?;
let node_name = start_match.value_of("node").unwrap_or("main"); let node_name = start_match.value_of("node").unwrap_or("main");
let timeline_name = start_match.value_of("timeline"); let timeline_name = start_match.value_of("timeline");
@@ -572,7 +542,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
} }
} }
("stop", Some(stop_match)) => { ("stop", Some(stop_match)) => {
let tenantid = get_tenantid(stop_match, env)?;
let node_name = stop_match.value_of("node").unwrap_or("main"); let node_name = stop_match.value_of("node").unwrap_or("main");
let destroy = stop_match.is_present("destroy"); let destroy = stop_match.is_present("destroy");
@@ -636,26 +605,23 @@ fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNod
} }
fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() { let (sub_name, sub_args) = sub_match.subcommand();
("start", Some(sub_match)) => { let sub_args = sub_args.expect("no safekeeper subcommand");
let node_name = sub_match
.value_of("node")
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let safekeeper = get_safekeeper(env, node_name)?;
// All the commands take an optional safekeeper name argument
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let safekeeper = get_safekeeper(env, node_name)?;
match sub_name {
"start" => {
if let Err(e) = safekeeper.start() { if let Err(e) = safekeeper.start() {
eprintln!("safekeeper start failed: {}", e); eprintln!("safekeeper start failed: {}", e);
exit(1); exit(1);
} }
} }
("stop", Some(sub_match)) => { "stop" => {
let node_name = sub_match let immediate = sub_args.value_of("stop-mode") == Some("immediate");
.value_of("node")
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let immediate = sub_match.value_of("stop-mode") == Some("immediate");
let safekeeper = get_safekeeper(env, node_name)?;
if let Err(e) = safekeeper.stop(immediate) { if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper stop failed: {}", e); eprintln!("safekeeper stop failed: {}", e);
@@ -663,15 +629,10 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
} }
} }
("restart", Some(sub_match)) => { "restart" => {
let node_name = sub_match let immediate = sub_args.value_of("stop-mode") == Some("immediate");
.value_of("node")
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let safekeeper = get_safekeeper(env, node_name)?; if let Err(e) = safekeeper.stop(immediate) {
//TODO what shutdown strategy should we use here?
if let Err(e) = safekeeper.stop(false) {
eprintln!("safekeeper stop failed: {}", e); eprintln!("safekeeper stop failed: {}", e);
exit(1); exit(1);
} }
@@ -682,7 +643,9 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
} }
} }
_ => {} _ => {
bail!("Unexpected safekeeper subcommand '{}'", sub_name)
}
} }
Ok(()) Ok(())
} }