mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
5 Commits
conrad/pro
...
wip-perf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea56b4a36a | ||
|
|
7cf7215ce2 | ||
|
|
23713eb44f | ||
|
|
28675739de | ||
|
|
ea90d102e2 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -208,8 +208,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "bookfile"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753"
|
||||
dependencies = [
|
||||
"aversion",
|
||||
"byteorder",
|
||||
@@ -1195,6 +1193,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"nix",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
|
||||
@@ -5,7 +5,7 @@ authors = ["Stas Kelvich <stas@zenith.tech>"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
bookfile = "^0.3"
|
||||
bookfile = { path = "../../bookfile" }
|
||||
chrono = "0.4.19"
|
||||
rand = "0.8.3"
|
||||
regex = "1.4.5"
|
||||
@@ -37,6 +37,7 @@ async-trait = "0.1"
|
||||
const_format = "0.2.21"
|
||||
tracing = "0.1.27"
|
||||
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
|
||||
nix = "0.23"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
|
||||
@@ -28,6 +28,7 @@ use daemonize::Daemonize;
|
||||
|
||||
use pageserver::{
|
||||
branches, defaults::*, http, page_service, relish_storage, tenant_mgr, PageServerConf,
|
||||
layered_repository,
|
||||
RelishStorageConfig, RelishStorageKind, S3Config, LOG_FILE_NAME,
|
||||
};
|
||||
use zenith_utils::http::endpoint;
|
||||
@@ -549,6 +550,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
|
||||
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
|
||||
})?;
|
||||
|
||||
let global_job_thread = layered_repository::launch_global_job_thread(conf);
|
||||
|
||||
for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() {
|
||||
match info.signal {
|
||||
SIGQUIT => {
|
||||
@@ -577,6 +580,12 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
|
||||
.expect("thread panicked")
|
||||
.expect("thread exited with an error");
|
||||
}
|
||||
|
||||
// Shut down global job thread
|
||||
global_job_thread
|
||||
.join()
|
||||
.expect("thread panicked");
|
||||
|
||||
info!("Pageserver shut down successfully completed");
|
||||
exit(0);
|
||||
}
|
||||
|
||||
@@ -230,7 +230,7 @@ fn bootstrap_timeline(
|
||||
timeline.writer().as_ref(),
|
||||
lsn,
|
||||
)?;
|
||||
timeline.checkpoint()?;
|
||||
timeline.checkpoint_forced()?;
|
||||
|
||||
println!(
|
||||
"created initial timeline {} timeline.lsn {}",
|
||||
|
||||
@@ -59,6 +59,7 @@ mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod interval_tree;
|
||||
mod layer_map;
|
||||
mod jobs;
|
||||
mod page_versions;
|
||||
mod storage_layer;
|
||||
|
||||
@@ -66,10 +67,12 @@ use delta_layer::DeltaLayer;
|
||||
use image_layer::ImageLayer;
|
||||
|
||||
use inmemory_layer::InMemoryLayer;
|
||||
use layer_map::LayerMap;
|
||||
use layer_map::{LayerId, LayerMap};
|
||||
use storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
use jobs::{GlobalJob, schedule_job};
|
||||
pub use jobs::launch_global_job_thread;
|
||||
|
||||
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
@@ -129,10 +132,17 @@ pub struct LayeredRepository {
|
||||
/// Makes evey repo's timelines to backup their files to remote storage,
|
||||
/// when they get frozen.
|
||||
upload_relishes: bool,
|
||||
|
||||
is_gc_scheduled: Mutex<bool>,
|
||||
}
|
||||
|
||||
/// Public interface
|
||||
impl Repository for LayeredRepository {
|
||||
|
||||
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository {
|
||||
self
|
||||
}
|
||||
|
||||
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
|
||||
@@ -206,19 +216,33 @@ impl Repository for LayeredRepository {
|
||||
/// Public entry point to GC. All the logic is in the private
|
||||
/// gc_iteration_internal function, this public facade just wraps it for
|
||||
/// metrics collection.
|
||||
fn gc_iteration(
|
||||
fn gc_manual(
|
||||
&self,
|
||||
target_timelineid: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult> {
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["gc"])
|
||||
.with_label_values(&["gc_manual"])
|
||||
.observe_closure_duration(|| {
|
||||
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
|
||||
})
|
||||
}
|
||||
|
||||
fn gc_scheduled(&self) -> Result<GcResult> {
|
||||
let result = STORAGE_TIME
|
||||
.with_label_values(&["gc_scheduled"])
|
||||
.observe_closure_duration(|| {
|
||||
self.gc_iteration_internal(None, self.conf.gc_horizon, false)
|
||||
});
|
||||
|
||||
let mut guard = self.is_gc_scheduled.lock().unwrap();
|
||||
|
||||
*guard = false;
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
// Wait for all threads to complete and persist repository data before pageserver shutdown.
|
||||
fn shutdown(&self) -> Result<()> {
|
||||
trace!("LayeredRepository shutdown for tenant {}", self.tenantid);
|
||||
@@ -228,7 +252,7 @@ impl Repository for LayeredRepository {
|
||||
walreceiver::stop_wal_receiver(*timelineid);
|
||||
// Wait for syncing data to disk
|
||||
trace!("repo shutdown. checkpoint timeline {}", timelineid);
|
||||
timeline.checkpoint()?;
|
||||
timeline.checkpoint_forced()?;
|
||||
|
||||
//TODO Wait for walredo process to shutdown too
|
||||
}
|
||||
@@ -306,6 +330,7 @@ impl LayeredRepository {
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
walredo_mgr,
|
||||
upload_relishes,
|
||||
is_gc_scheduled: Mutex::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -355,44 +380,6 @@ impl LayeredRepository {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Launch the GC thread in given repository.
|
||||
///
|
||||
pub fn launch_gc_thread(
|
||||
conf: &'static PageServerConf,
|
||||
rc: Arc<LayeredRepository>,
|
||||
) -> JoinHandle<()> {
|
||||
std::thread::Builder::new()
|
||||
.name("GC thread".into())
|
||||
.spawn(move || {
|
||||
// FIXME: relaunch it? Panic is not good.
|
||||
rc.gc_loop(conf).expect("GC thread died");
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
///
|
||||
/// GC thread's main loop
|
||||
///
|
||||
fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> {
|
||||
while !tenant_mgr::shutdown_requested() {
|
||||
// Garbage collect old files that are not needed for PITR anymore
|
||||
if conf.gc_horizon > 0 {
|
||||
self.gc_iteration(None, conf.gc_horizon, false).unwrap();
|
||||
}
|
||||
|
||||
// TODO Write it in more adequate way using
|
||||
// condvar.wait_timeout() or something
|
||||
let mut sleep_time = conf.gc_period.as_secs();
|
||||
while sleep_time > 0 && !tenant_mgr::shutdown_requested() {
|
||||
sleep_time -= 1;
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
info!("gc thread for tenant {} waking up", self.tenantid);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Save timeline metadata to file
|
||||
fn save_metadata(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -546,7 +533,7 @@ impl LayeredRepository {
|
||||
// so that they too can be garbage collected. That's
|
||||
// used in tests, so we want as deterministic results as possible.
|
||||
if checkpoint_before_gc {
|
||||
timeline.checkpoint()?;
|
||||
timeline.checkpoint_forced()?;
|
||||
info!("timeline {} checkpoint_before_gc done", timelineid);
|
||||
}
|
||||
|
||||
@@ -689,10 +676,18 @@ pub struct LayeredTimeline {
|
||||
/// Must always be acquired before the layer map/individual layer lock
|
||||
/// to avoid deadlock.
|
||||
write_lock: Mutex<()>,
|
||||
|
||||
is_checkpoint_scheduled: Mutex<bool>,
|
||||
last_gc: Mutex<Option<Lsn>>,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline for LayeredTimeline {
|
||||
|
||||
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline {
|
||||
self
|
||||
}
|
||||
|
||||
fn get_ancestor_lsn(&self) -> Lsn {
|
||||
self.ancestor_lsn
|
||||
}
|
||||
@@ -867,13 +862,25 @@ impl Timeline for LayeredTimeline {
|
||||
/// Public entry point for checkpoint(). All the logic is in the private
|
||||
/// checkpoint_internal function, this public facade just wraps it for
|
||||
/// metrics collection.
|
||||
fn checkpoint(&self) -> Result<()> {
|
||||
fn checkpoint_forced(&self) -> Result<()> {
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["checkpoint_force"])
|
||||
.with_label_values(&["checkpoint_forced"])
|
||||
//pass checkpoint_distance=0 to force checkpoint
|
||||
.observe_closure_duration(|| self.checkpoint_internal(0, true))
|
||||
}
|
||||
|
||||
fn checkpoint_scheduled(&self) -> Result<()> {
|
||||
|
||||
let result = STORAGE_TIME
|
||||
.with_label_values(&["checkpoint_scheduled"])
|
||||
.observe_closure_duration(|| self.checkpoint_internal(self.conf.checkpoint_distance, false));
|
||||
|
||||
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
|
||||
*guard = false;
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
}
|
||||
@@ -977,6 +984,9 @@ impl LayeredTimeline {
|
||||
upload_relishes,
|
||||
|
||||
write_lock: Mutex::new(()),
|
||||
|
||||
is_checkpoint_scheduled: Mutex::new(false),
|
||||
last_gc: Mutex::new(None),
|
||||
};
|
||||
Ok(timeline)
|
||||
}
|
||||
@@ -1145,7 +1155,7 @@ impl LayeredTimeline {
|
||||
///
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
|
||||
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<dyn Layer>> {
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
assert!(lsn.is_aligned());
|
||||
@@ -1160,7 +1170,9 @@ impl LayeredTimeline {
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
let layer;
|
||||
if let Some(open_layer) = layers.get_open(&seg) {
|
||||
if let Some(open_layer_arc) = layers.get_open(&seg) {
|
||||
let open_layer = open_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
|
||||
|
||||
if open_layer.get_start_lsn() > lsn {
|
||||
bail!("unexpected open layer in the future");
|
||||
}
|
||||
@@ -1185,7 +1197,7 @@ impl LayeredTimeline {
|
||||
lsn,
|
||||
)?;
|
||||
} else {
|
||||
return Ok(open_layer);
|
||||
return Ok(open_layer_arc);
|
||||
}
|
||||
}
|
||||
// No writeable layer for this relation. Create one.
|
||||
@@ -1276,9 +1288,10 @@ impl LayeredTimeline {
|
||||
// a lot of memory and/or aren't receiving much updates anymore.
|
||||
let mut disk_consistent_lsn = last_record_lsn;
|
||||
|
||||
let mut created_historics = false;
|
||||
let mut layer_uploads = Vec::new();
|
||||
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
|
||||
while let Some((oldest_layer_id, oldest_layer_arc, oldest_generation)) = layers.peek_oldest_open() {
|
||||
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
|
||||
|
||||
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
|
||||
|
||||
if tenant_mgr::shutdown_requested() && !forced {
|
||||
@@ -1307,62 +1320,26 @@ impl LayeredTimeline {
|
||||
break;
|
||||
}
|
||||
|
||||
// Mark the layer as no longer accepting writes and record the end_lsn.
|
||||
// This happens in-place, no new layers are created now.
|
||||
// We call `get_last_record_lsn` again, which may be different from the
|
||||
// original load, as we may have released the write lock since then.
|
||||
oldest_layer.freeze(self.get_last_record_lsn());
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
layers.pop_oldest_open();
|
||||
layers.insert_historic(oldest_layer.clone());
|
||||
|
||||
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
|
||||
drop(layers);
|
||||
drop(write_guard);
|
||||
|
||||
let new_historics = oldest_layer.write_to_disk(self)?;
|
||||
// Evict the layer
|
||||
self.evict_layer(oldest_layer_id)?;
|
||||
|
||||
write_guard = self.write_lock.lock().unwrap();
|
||||
layers = self.layers.lock().unwrap();
|
||||
|
||||
if !new_historics.is_empty() {
|
||||
created_historics = true;
|
||||
}
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layers
|
||||
layers.remove_historic(oldest_layer);
|
||||
|
||||
// Add the historics to the LayerMap
|
||||
for delta_layer in new_historics.delta_layers {
|
||||
layer_uploads.push(delta_layer.path());
|
||||
layers.insert_historic(Arc::new(delta_layer));
|
||||
}
|
||||
for image_layer in new_historics.image_layers {
|
||||
layer_uploads.push(image_layer.path());
|
||||
layers.insert_historic(Arc::new(image_layer));
|
||||
}
|
||||
}
|
||||
|
||||
// Call unload() on all frozen layers, to release memory.
|
||||
// This shouldn't be much memory, as only metadata is slurped
|
||||
// into memory.
|
||||
for layer in layers.iter_historic_layers() {
|
||||
for (_layer_id, layer) in layers.iter_historic_layers() {
|
||||
layer.unload()?;
|
||||
}
|
||||
|
||||
drop(layers);
|
||||
drop(write_guard);
|
||||
|
||||
if created_historics {
|
||||
// We must fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
let timeline_dir =
|
||||
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
|
||||
timeline_dir.sync_all()?;
|
||||
}
|
||||
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
// After crash, we will restart WAL streaming and processing from that point.
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
@@ -1408,6 +1385,131 @@ impl LayeredTimeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn schedule_checkpoint_if_needed(&self) -> Result<()> {
|
||||
|
||||
let mut guard = self.is_checkpoint_scheduled.lock().unwrap();
|
||||
if *guard == true {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let RecordLsn {
|
||||
last: last_record_lsn,
|
||||
prev: _prev_record_lsn,
|
||||
} = self.last_record_lsn.load();
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
if let Some((_oldest_layer_id, oldest_layer_arc, _oldest_generation)) = layers.peek_oldest_open() {
|
||||
let oldest_layer = oldest_layer_arc.upgrade_to_inmemory_layer().expect("open layer is not an in-memory layer");
|
||||
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
|
||||
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
|
||||
if distance > self.conf.checkpoint_distance.into() {
|
||||
schedule_job(GlobalJob::CheckpointTimeline(self.tenantid, self.timelineid));
|
||||
*guard = true;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn schedule_gc_if_needed(&self) -> Result<()> {
|
||||
|
||||
let RecordLsn {
|
||||
last: last_record_lsn,
|
||||
prev: _prev_record_lsn,
|
||||
} = self.last_record_lsn.load();
|
||||
|
||||
let gc_needed = {
|
||||
let last_gc = self.last_gc.lock().unwrap();
|
||||
|
||||
if let Some(last_gc) = *last_gc {
|
||||
let distance = last_record_lsn.widening_sub(last_gc);
|
||||
if distance > std::cmp::max(10*1024*1024, self.conf.gc_horizon / 2) as i128 {
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if !gc_needed {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(self.tenantid)?;
|
||||
let repo = repo.upgrade_to_layered_repository();
|
||||
|
||||
let mut gc_scheduled = repo.is_gc_scheduled.lock().unwrap();
|
||||
if *gc_scheduled == true {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
schedule_job(GlobalJob::GarbageCollect(self.tenantid));
|
||||
*gc_scheduled = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn evict_layer(&self, layer_id: LayerId) -> Result<()> {
|
||||
let mut write_guard = self.write_lock.lock().unwrap();
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
if let Some(victim_layer_arc) = layers.get_with_id(layer_id) {
|
||||
|
||||
if let Some(victim_layer) = victim_layer_arc.upgrade_to_inmemory_layer() {
|
||||
|
||||
// Mark the layer as no longer accepting writes and record the end_lsn.
|
||||
// This happens in-place, no new layers are created now.
|
||||
// We call `get_last_record_lsn` again, which may be different from the
|
||||
// original load, as we may have released the write lock since then.
|
||||
victim_layer.freeze(self.get_last_record_lsn());
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
layers.remove(layer_id);
|
||||
|
||||
let frozen_layer_id = layers.insert_historic(victim_layer_arc.clone());
|
||||
|
||||
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
|
||||
drop(layers);
|
||||
drop(write_guard);
|
||||
|
||||
let new_historics = victim_layer.write_to_disk(self)?;
|
||||
let created_historics = !new_historics.is_empty();
|
||||
|
||||
write_guard = self.write_lock.lock().unwrap();
|
||||
layers = self.layers.lock().unwrap();
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layers
|
||||
layers.remove(frozen_layer_id);
|
||||
|
||||
// Add the historics to the LayerMap
|
||||
for delta_layer in new_historics.delta_layers {
|
||||
// FIXME layer_uploads.push(delta_layer.path());
|
||||
layers.insert_historic(Arc::new(delta_layer));
|
||||
}
|
||||
for image_layer in new_historics.image_layers {
|
||||
// FIXME layer_uploads.push(image_layer.path());
|
||||
layers.insert_historic(Arc::new(image_layer));
|
||||
}
|
||||
|
||||
if created_historics {
|
||||
// We must fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
//
|
||||
// TODO: it's inefficient to do this after every eviction, if we're evicting
|
||||
// a lot of layers.
|
||||
let timeline_dir =
|
||||
File::open(self.conf.timeline_path(&self.timelineid, &self.tenantid))?;
|
||||
timeline_dir.sync_all()?;
|
||||
}
|
||||
drop(layers);
|
||||
drop(write_guard);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Garbage collect layer files on a timeline that are no longer needed.
|
||||
///
|
||||
@@ -1440,7 +1542,7 @@ impl LayeredTimeline {
|
||||
|
||||
debug!("retain_lsns: {:?}", retain_lsns);
|
||||
|
||||
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
let mut layers_to_remove: Vec<LayerId> = Vec::new();
|
||||
|
||||
// Scan all on-disk layers in the timeline.
|
||||
//
|
||||
@@ -1451,7 +1553,7 @@ impl LayeredTimeline {
|
||||
// 4. this layer doesn't serve as a tombstone for some older layer;
|
||||
//
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
'outer: for (layer_id, l) in layers.iter_historic_layers() {
|
||||
let seg = l.get_seg_tag();
|
||||
|
||||
if seg.rel.is_relation() {
|
||||
@@ -1462,7 +1564,7 @@ impl LayeredTimeline {
|
||||
|
||||
// 1. Is it newer than cutoff point?
|
||||
if l.get_end_lsn() > cutoff {
|
||||
info!(
|
||||
trace!(
|
||||
"keeping {} {}-{} because it's newer than cutoff {}",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
@@ -1481,7 +1583,7 @@ impl LayeredTimeline {
|
||||
for retain_lsn in &retain_lsns {
|
||||
// start_lsn is inclusive and end_lsn is exclusive
|
||||
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
|
||||
info!(
|
||||
trace!(
|
||||
"keeping {} {}-{} because it's needed by branch point {}",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
@@ -1500,7 +1602,7 @@ impl LayeredTimeline {
|
||||
// 3. Is there a later on-disk layer for this relation?
|
||||
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
|
||||
{
|
||||
info!(
|
||||
trace!(
|
||||
"keeping {} {}-{} because it is the latest layer",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
@@ -1569,7 +1671,7 @@ impl LayeredTimeline {
|
||||
}
|
||||
|
||||
if is_tombstone {
|
||||
info!(
|
||||
trace!(
|
||||
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
@@ -1593,27 +1695,32 @@ impl LayeredTimeline {
|
||||
l.get_end_lsn(),
|
||||
l.is_dropped()
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
layers_to_remove.push(layer_id);
|
||||
}
|
||||
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
for doomed_layer in layers_to_remove {
|
||||
doomed_layer.delete()?;
|
||||
layers.remove_historic(doomed_layer.clone());
|
||||
for doomed_layer_id in layers_to_remove {
|
||||
if let Some(doomed_layer) = layers.get_with_id(doomed_layer_id) {
|
||||
|
||||
match (
|
||||
doomed_layer.is_dropped(),
|
||||
doomed_layer.get_seg_tag().rel.is_relation(),
|
||||
) {
|
||||
(true, true) => result.ondisk_relfiles_dropped += 1,
|
||||
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
|
||||
(false, true) => result.ondisk_relfiles_removed += 1,
|
||||
(false, false) => result.ondisk_nonrelfiles_removed += 1,
|
||||
doomed_layer.delete()?;
|
||||
layers.remove(doomed_layer_id);
|
||||
match (
|
||||
doomed_layer.is_dropped(),
|
||||
doomed_layer.get_seg_tag().rel.is_relation(),
|
||||
) {
|
||||
(true, true) => result.ondisk_relfiles_dropped += 1,
|
||||
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
|
||||
(false, true) => result.ondisk_relfiles_removed += 1,
|
||||
(false, false) => result.ondisk_nonrelfiles_removed += 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut guard = self.last_gc.lock().unwrap();
|
||||
*guard = Some(cutoff);
|
||||
|
||||
result.elapsed = now.elapsed();
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1806,9 +1913,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
|
||||
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
let layer = self.tl.get_layer_for_write(seg, lsn)?;
|
||||
let delta_size = layer.put_wal_record(lsn, blknum, rec);
|
||||
let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_wal_record(lsn, blknum, rec);
|
||||
self.tl
|
||||
.increase_current_logical_size(delta_size * BLCKSZ as u32);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1825,7 +1933,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
|
||||
let layer = self.tl.get_layer_for_write(seg, lsn)?;
|
||||
let delta_size = layer.put_page_image(blknum, lsn, img);
|
||||
let delta_size = layer.upgrade_to_inmemory_layer().unwrap().put_page_image(blknum, lsn, img);
|
||||
|
||||
self.tl
|
||||
.increase_current_logical_size(delta_size * BLCKSZ as u32);
|
||||
@@ -1870,7 +1978,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
|
||||
};
|
||||
|
||||
let layer = self.tl.get_layer_for_write(seg, lsn)?;
|
||||
layer.drop_segment(lsn);
|
||||
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
|
||||
}
|
||||
|
||||
// Truncate the last remaining segment to the specified size
|
||||
@@ -1880,7 +1988,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
|
||||
segno: last_remain_seg,
|
||||
};
|
||||
let layer = self.tl.get_layer_for_write(seg, lsn)?;
|
||||
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
|
||||
layer.upgrade_to_inmemory_layer().unwrap().put_truncation(lsn, relsize % RELISH_SEG_SIZE)
|
||||
}
|
||||
self.tl
|
||||
.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
|
||||
@@ -1908,7 +2016,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
|
||||
segno: remove_segno,
|
||||
};
|
||||
let layer = self.tl.get_layer_for_write(seg, lsn)?;
|
||||
layer.drop_segment(lsn);
|
||||
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
|
||||
}
|
||||
self.tl
|
||||
.decrease_current_logical_size(oldsize * BLCKSZ as u32);
|
||||
@@ -1922,7 +2030,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
|
||||
// TODO handle TwoPhase relishes
|
||||
let seg = SegmentTag::from_blknum(rel, 0);
|
||||
let layer = self.tl.get_layer_for_write(seg, lsn)?;
|
||||
layer.drop_segment(lsn);
|
||||
layer.upgrade_to_inmemory_layer().unwrap().drop_segment(lsn);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct BlobRange {
|
||||
offset: u64,
|
||||
size: usize,
|
||||
pub offset: u64,
|
||||
pub size: usize,
|
||||
}
|
||||
|
||||
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {
|
||||
|
||||
@@ -42,6 +42,7 @@ use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
|
||||
};
|
||||
use crate::vfd::VirtualFile;
|
||||
use crate::waldecoder;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
@@ -145,9 +146,15 @@ pub struct DeltaLayerInner {
|
||||
|
||||
/// `relsizes` tracks the size of the relation at different points in time.
|
||||
relsizes: VecMap<Lsn, u32>,
|
||||
|
||||
vfile: VirtualFile,
|
||||
}
|
||||
|
||||
impl Layer for DeltaLayer {
|
||||
fn get_tenant_id(&self) -> ZTenantId {
|
||||
self.tenantid
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
@@ -186,9 +193,11 @@ impl Layer for DeltaLayer {
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
// TODO: avoid opening the file for each read
|
||||
let (_path, book) = self.open_book()?;
|
||||
let mut inner = self.load()?;
|
||||
let file = inner.vfile.open()?;
|
||||
let book = Book::new(file)?;
|
||||
|
||||
let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
|
||||
let inner = self.load()?;
|
||||
|
||||
// Scan the metadata BTreeMap backwards, starting from the given entry.
|
||||
let minkey = (blknum, Lsn(0));
|
||||
@@ -221,6 +230,9 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
|
||||
// release metadata lock and close the file
|
||||
|
||||
let file = book.close();
|
||||
inner.vfile.cache(file);
|
||||
}
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
@@ -365,6 +377,18 @@ impl DeltaLayer {
|
||||
assert!(!relsizes.is_empty());
|
||||
}
|
||||
|
||||
let path = Self::path_for(
|
||||
&PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
tenantid,
|
||||
&DeltaFileName {
|
||||
seg: seg,
|
||||
start_lsn: start_lsn,
|
||||
end_lsn: end_lsn,
|
||||
dropped: dropped,
|
||||
}
|
||||
);
|
||||
|
||||
let delta_layer = DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
@@ -377,6 +401,7 @@ impl DeltaLayer {
|
||||
loaded: true,
|
||||
page_version_metas: VecMap::default(),
|
||||
relsizes,
|
||||
vfile: VirtualFile::new(&path),
|
||||
}),
|
||||
};
|
||||
let mut inner = delta_layer.inner.lock().unwrap();
|
||||
@@ -496,11 +521,9 @@ impl DeltaLayer {
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
|
||||
*inner = DeltaLayerInner {
|
||||
loaded: true,
|
||||
page_version_metas,
|
||||
relsizes,
|
||||
};
|
||||
inner.loaded = true;
|
||||
inner.page_version_metas = page_version_metas;
|
||||
inner.relsizes = relsizes;
|
||||
|
||||
Ok(inner)
|
||||
}
|
||||
@@ -512,6 +535,13 @@ impl DeltaLayer {
|
||||
tenantid: ZTenantId,
|
||||
filename: &DeltaFileName,
|
||||
) -> DeltaLayer {
|
||||
let path = Self::path_for(
|
||||
&PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
tenantid,
|
||||
&filename,
|
||||
);
|
||||
|
||||
DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
@@ -524,6 +554,7 @@ impl DeltaLayer {
|
||||
loaded: false,
|
||||
page_version_metas: VecMap::default(),
|
||||
relsizes: VecMap::default(),
|
||||
vfile: VirtualFile::new(&path),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -534,7 +565,7 @@ impl DeltaLayer {
|
||||
pub fn new_for_path(path: &Path, book: &Book<File>) -> Result<Self> {
|
||||
let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
|
||||
let summary = Summary::des(&chapter)?;
|
||||
|
||||
|
||||
Ok(DeltaLayer {
|
||||
path_or_conf: PathOrConf::Path(path.to_path_buf()),
|
||||
timelineid: summary.timelineid,
|
||||
@@ -547,6 +578,7 @@ impl DeltaLayer {
|
||||
loaded: false,
|
||||
page_version_metas: VecMap::default(),
|
||||
relsizes: VecMap::default(),
|
||||
vfile: VirtualFile::new(path),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::RELISH_SEG_SIZE;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use crate::vfd::VirtualFile;
|
||||
use anyhow::{anyhow, bail, ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
@@ -36,7 +37,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::convert::TryInto;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Mutex, MutexGuard};
|
||||
|
||||
@@ -110,6 +111,8 @@ pub struct ImageLayerInner {
|
||||
|
||||
/// Derived from filename and bookfile chapter metadata
|
||||
image_type: ImageType,
|
||||
|
||||
vfile: VirtualFile,
|
||||
}
|
||||
|
||||
impl Layer for ImageLayer {
|
||||
@@ -117,6 +120,10 @@ impl Layer for ImageLayer {
|
||||
PathBuf::from(self.layer_name().to_string())
|
||||
}
|
||||
|
||||
fn get_tenant_id(&self) -> ZTenantId {
|
||||
self.tenantid
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
@@ -147,11 +154,12 @@ impl Layer for ImageLayer {
|
||||
) -> Result<PageReconstructResult> {
|
||||
assert!(lsn >= self.lsn);
|
||||
|
||||
let inner = self.load()?;
|
||||
let mut inner = self.load()?;
|
||||
|
||||
let base_blknum = blknum % RELISH_SEG_SIZE;
|
||||
|
||||
let (_path, book) = self.open_book()?;
|
||||
let mut file = inner.vfile.open()?;
|
||||
let mut book = Book::new(&mut file)?;
|
||||
|
||||
let buf = match &inner.image_type {
|
||||
ImageType::Blocky { num_blocks } => {
|
||||
@@ -162,17 +170,20 @@ impl Layer for ImageLayer {
|
||||
let mut buf = vec![0u8; BLOCK_SIZE];
|
||||
let offset = BLOCK_SIZE as u64 * base_blknum as u64;
|
||||
|
||||
let chapter = book.chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
|
||||
chapter.read_exact_at(&mut buf, offset)?;
|
||||
let mut chapter = book.exclusive_chapter_reader(BLOCKY_IMAGES_CHAPTER)?;
|
||||
chapter.seek(SeekFrom::Start(offset))?;
|
||||
chapter.read_exact(&mut buf)?;
|
||||
|
||||
buf
|
||||
}
|
||||
ImageType::NonBlocky => {
|
||||
ensure!(base_blknum == 0);
|
||||
book.read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec()
|
||||
book.exclusive_read_chapter(NONBLOCKY_IMAGE_CHAPTER)?.into_vec()
|
||||
}
|
||||
};
|
||||
|
||||
inner.vfile.cache(file);
|
||||
|
||||
reconstruct_data.page_img = Some(Bytes::from(buf));
|
||||
Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
@@ -266,6 +277,16 @@ impl ImageLayer {
|
||||
ImageType::NonBlocky
|
||||
};
|
||||
|
||||
let path = Self::path_for(
|
||||
&PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
tenantid,
|
||||
&ImageFileName {
|
||||
seg: seg,
|
||||
lsn: lsn,
|
||||
}
|
||||
);
|
||||
|
||||
let layer = ImageLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
@@ -275,12 +296,12 @@ impl ImageLayer {
|
||||
inner: Mutex::new(ImageLayerInner {
|
||||
loaded: true,
|
||||
image_type: image_type.clone(),
|
||||
vfile: VirtualFile::new(&path),
|
||||
}),
|
||||
};
|
||||
let inner = layer.inner.lock().unwrap();
|
||||
|
||||
// Write the images into a file
|
||||
let path = layer.path();
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let file = File::create(&path)?;
|
||||
@@ -374,7 +395,8 @@ impl ImageLayer {
|
||||
return Ok(inner);
|
||||
}
|
||||
|
||||
let (path, book) = self.open_book()?;
|
||||
|
||||
let book = Book::new(inner.vfile.open()?)?;
|
||||
|
||||
match &self.path_or_conf {
|
||||
PathOrConf::Conf(_) => {
|
||||
@@ -412,12 +434,10 @@ impl ImageLayer {
|
||||
ImageType::NonBlocky
|
||||
};
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
debug!("loaded from {}", &self.path().display());
|
||||
|
||||
*inner = ImageLayerInner {
|
||||
loaded: true,
|
||||
image_type,
|
||||
};
|
||||
inner.loaded = true;
|
||||
inner.image_type = image_type;
|
||||
|
||||
Ok(inner)
|
||||
}
|
||||
@@ -438,6 +458,14 @@ impl ImageLayer {
|
||||
tenantid: ZTenantId,
|
||||
filename: &ImageFileName,
|
||||
) -> ImageLayer {
|
||||
|
||||
let path = Self::path_for(
|
||||
&PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
tenantid,
|
||||
filename,
|
||||
);
|
||||
|
||||
ImageLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
timelineid,
|
||||
@@ -447,6 +475,7 @@ impl ImageLayer {
|
||||
inner: Mutex::new(ImageLayerInner {
|
||||
loaded: false,
|
||||
image_type: ImageType::Blocky { num_blocks: 0 },
|
||||
vfile: VirtualFile::new(&path),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -467,6 +496,7 @@ impl ImageLayer {
|
||||
inner: Mutex::new(ImageLayerInner {
|
||||
loaded: false,
|
||||
image_type: ImageType::Blocky { num_blocks: 0 },
|
||||
vfile: VirtualFile::new(path),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -90,6 +90,11 @@ impl InMemoryLayerInner {
|
||||
}
|
||||
|
||||
impl Layer for InMemoryLayer {
|
||||
|
||||
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
|
||||
Some(self)
|
||||
}
|
||||
|
||||
// An in-memory layer doesn't really have a filename as it's not stored on disk,
|
||||
// but we construct a filename as if it was a delta layer
|
||||
fn filename(&self) -> PathBuf {
|
||||
@@ -113,6 +118,10 @@ impl Layer for InMemoryLayer {
|
||||
PathBuf::from(format!("inmem-{}", delta_filename))
|
||||
}
|
||||
|
||||
fn get_tenant_id(&self) -> ZTenantId {
|
||||
self.tenantid
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
self.timelineid
|
||||
}
|
||||
|
||||
@@ -41,23 +41,22 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct IntervalTree<I: ?Sized>
|
||||
pub struct IntervalTree<I>
|
||||
where
|
||||
I: IntervalItem,
|
||||
{
|
||||
points: BTreeMap<I::Key, Point<I>>,
|
||||
}
|
||||
|
||||
struct Point<I: ?Sized> {
|
||||
struct Point<I> {
|
||||
/// All intervals that contain this point, in no particular order.
|
||||
///
|
||||
/// We assume that there aren't a lot of overlappingg intervals, so that this vector
|
||||
/// never grows very large. If that assumption doesn't hold, we could keep this ordered
|
||||
/// by the end bound, to speed up `search`. But as long as there are only a few elements,
|
||||
/// a linear search is OK.
|
||||
elements: Vec<Arc<I>>,
|
||||
elements: Vec<I>,
|
||||
}
|
||||
|
||||
/// Abstraction for an interval that can be stored in the tree
|
||||
@@ -75,14 +74,14 @@ pub trait IntervalItem {
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: ?Sized> IntervalTree<I>
|
||||
impl<I> IntervalTree<I>
|
||||
where
|
||||
I: IntervalItem,
|
||||
I: IntervalItem + PartialEq + Clone,
|
||||
{
|
||||
/// Return an element that contains 'key', or precedes it.
|
||||
///
|
||||
/// If there are multiple candidates, returns the one with the highest 'end' key.
|
||||
pub fn search(&self, key: I::Key) -> Option<Arc<I>> {
|
||||
pub fn search(&self, key: I::Key) -> Option<&I> {
|
||||
// Find the greatest point that precedes or is equal to the search key. If there is
|
||||
// none, returns None.
|
||||
let (_, p) = self.points.range(..=key).next_back()?;
|
||||
@@ -100,7 +99,7 @@ where
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
Some(Arc::clone(highest_item))
|
||||
Some(highest_item)
|
||||
}
|
||||
|
||||
/// Iterate over all items with start bound >= 'key'
|
||||
@@ -119,7 +118,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, item: Arc<I>) {
|
||||
pub fn insert(&mut self, item: &I) {
|
||||
let start_key = item.start_key();
|
||||
let end_key = item.end_key();
|
||||
assert!(start_key < end_key);
|
||||
@@ -133,18 +132,18 @@ where
|
||||
found_start_point = true;
|
||||
// It is an error to insert the same item to the tree twice.
|
||||
assert!(
|
||||
!point.elements.iter().any(|x| Arc::ptr_eq(x, &item)),
|
||||
!point.elements.iter().any(|x| x == item),
|
||||
"interval is already in the tree"
|
||||
);
|
||||
}
|
||||
point.elements.push(Arc::clone(&item));
|
||||
point.elements.push(item.clone());
|
||||
}
|
||||
if !found_start_point {
|
||||
// Create a new Point for the starting point
|
||||
|
||||
// Look at the previous point, and copy over elements that overlap with this
|
||||
// new point
|
||||
let mut new_elements: Vec<Arc<I>> = Vec::new();
|
||||
let mut new_elements: Vec<I> = Vec::new();
|
||||
if let Some((_, prev_point)) = self.points.range(..start_key).next_back() {
|
||||
let overlapping_prev_elements = prev_point
|
||||
.elements
|
||||
@@ -154,7 +153,7 @@ where
|
||||
|
||||
new_elements.extend(overlapping_prev_elements);
|
||||
}
|
||||
new_elements.push(item);
|
||||
new_elements.push(item.clone());
|
||||
|
||||
let new_point = Point {
|
||||
elements: new_elements,
|
||||
@@ -163,7 +162,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, item: &Arc<I>) {
|
||||
pub fn remove(&mut self, item: &I) {
|
||||
// range search points
|
||||
let start_key = item.start_key();
|
||||
let end_key = item.end_key();
|
||||
@@ -176,7 +175,7 @@ where
|
||||
found_start_point = true;
|
||||
}
|
||||
let len_before = point.elements.len();
|
||||
point.elements.retain(|other| !Arc::ptr_eq(other, item));
|
||||
point.elements.retain(|other| other != item);
|
||||
let len_after = point.elements.len();
|
||||
assert_eq!(len_after + 1, len_before);
|
||||
if len_after == 0 {
|
||||
@@ -191,19 +190,19 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IntervalIter<'a, I: ?Sized>
|
||||
pub struct IntervalIter<'a, I>
|
||||
where
|
||||
I: IntervalItem,
|
||||
{
|
||||
point_iter: std::collections::btree_map::Range<'a, I::Key, Point<I>>,
|
||||
elem_iter: Option<(I::Key, std::slice::Iter<'a, Arc<I>>)>,
|
||||
elem_iter: Option<(I::Key, std::slice::Iter<'a, I>)>,
|
||||
}
|
||||
|
||||
impl<'a, I> Iterator for IntervalIter<'a, I>
|
||||
where
|
||||
I: IntervalItem + ?Sized,
|
||||
I: IntervalItem,
|
||||
{
|
||||
type Item = Arc<I>;
|
||||
type Item = &'a I;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// Iterate over all elements in all the points in 'point_iter'. To avoid
|
||||
@@ -214,7 +213,7 @@ where
|
||||
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
|
||||
for elem in elem_iter {
|
||||
if elem.start_key() == *point_key {
|
||||
return Some(Arc::clone(elem));
|
||||
return Some(elem);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -230,7 +229,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: ?Sized> Default for IntervalTree<I>
|
||||
impl<I> Default for IntervalTree<I>
|
||||
where
|
||||
I: IntervalItem,
|
||||
{
|
||||
@@ -246,7 +245,7 @@ mod tests {
|
||||
use super::*;
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
struct MockItem {
|
||||
start_key: u32,
|
||||
end_key: u32,
|
||||
@@ -288,7 +287,7 @@ mod tests {
|
||||
tree: &IntervalTree<MockItem>,
|
||||
key: u32,
|
||||
expected: &[&str],
|
||||
) -> Option<Arc<MockItem>> {
|
||||
) -> Option<MockItem> {
|
||||
if let Some(v) = tree.search(key) {
|
||||
let vstr = v.to_string();
|
||||
|
||||
@@ -299,7 +298,7 @@ mod tests {
|
||||
key, v, expected,
|
||||
);
|
||||
|
||||
Some(v)
|
||||
Some(v.clone())
|
||||
} else {
|
||||
assert!(
|
||||
expected.is_empty(),
|
||||
@@ -331,12 +330,12 @@ mod tests {
|
||||
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
|
||||
|
||||
// Simple, non-overlapping ranges.
|
||||
tree.insert(Arc::new(MockItem::new(10, 11)));
|
||||
tree.insert(Arc::new(MockItem::new(11, 12)));
|
||||
tree.insert(Arc::new(MockItem::new(12, 13)));
|
||||
tree.insert(Arc::new(MockItem::new(18, 19)));
|
||||
tree.insert(Arc::new(MockItem::new(17, 18)));
|
||||
tree.insert(Arc::new(MockItem::new(15, 16)));
|
||||
tree.insert(&MockItem::new(10, 11));
|
||||
tree.insert(&MockItem::new(11, 12));
|
||||
tree.insert(&MockItem::new(12, 13));
|
||||
tree.insert(&MockItem::new(18, 19));
|
||||
tree.insert(&MockItem::new(17, 18));
|
||||
tree.insert(&MockItem::new(15, 16));
|
||||
|
||||
assert_search(&tree, 9, &[]);
|
||||
assert_search(&tree, 10, &["10-11"]);
|
||||
@@ -370,13 +369,13 @@ mod tests {
|
||||
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
|
||||
|
||||
// Overlapping items
|
||||
tree.insert(Arc::new(MockItem::new(22, 24)));
|
||||
tree.insert(Arc::new(MockItem::new(23, 25)));
|
||||
let x24_26 = Arc::new(MockItem::new(24, 26));
|
||||
tree.insert(Arc::clone(&x24_26));
|
||||
let x26_28 = Arc::new(MockItem::new(26, 28));
|
||||
tree.insert(Arc::clone(&x26_28));
|
||||
tree.insert(Arc::new(MockItem::new(25, 27)));
|
||||
tree.insert(&MockItem::new(22, 24));
|
||||
tree.insert(&MockItem::new(23, 25));
|
||||
let x24_26 = MockItem::new(24, 26);
|
||||
tree.insert(&x24_26);
|
||||
let x26_28 = MockItem::new(26, 28);
|
||||
tree.insert(&x26_28);
|
||||
tree.insert(&MockItem::new(25, 27));
|
||||
|
||||
assert_search(&tree, 22, &["22-24"]);
|
||||
assert_search(&tree, 23, &["22-24", "23-25"]);
|
||||
@@ -403,10 +402,10 @@ mod tests {
|
||||
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
|
||||
|
||||
// Items containing other items
|
||||
tree.insert(Arc::new(MockItem::new(31, 39)));
|
||||
tree.insert(Arc::new(MockItem::new(32, 34)));
|
||||
tree.insert(Arc::new(MockItem::new(33, 35)));
|
||||
tree.insert(Arc::new(MockItem::new(30, 40)));
|
||||
tree.insert(&MockItem::new(31, 39));
|
||||
tree.insert(&MockItem::new(32, 34));
|
||||
tree.insert(&MockItem::new(33, 35));
|
||||
tree.insert(&MockItem::new(30, 40));
|
||||
|
||||
assert_search(&tree, 30, &["30-40"]);
|
||||
assert_search(&tree, 31, &["30-40", "31-39"]);
|
||||
@@ -427,16 +426,16 @@ mod tests {
|
||||
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
|
||||
|
||||
// Duplicate keys
|
||||
let item_a = Arc::new(MockItem::new_str(55, 56, "a"));
|
||||
tree.insert(Arc::clone(&item_a));
|
||||
let item_b = Arc::new(MockItem::new_str(55, 56, "b"));
|
||||
tree.insert(Arc::clone(&item_b));
|
||||
let item_c = Arc::new(MockItem::new_str(55, 56, "c"));
|
||||
tree.insert(Arc::clone(&item_c));
|
||||
let item_d = Arc::new(MockItem::new_str(54, 56, "d"));
|
||||
tree.insert(Arc::clone(&item_d));
|
||||
let item_e = Arc::new(MockItem::new_str(55, 57, "e"));
|
||||
tree.insert(Arc::clone(&item_e));
|
||||
let item_a = MockItem::new_str(55, 56, "a");
|
||||
tree.insert(&item_a);
|
||||
let item_b = MockItem::new_str(55, 56, "b");
|
||||
tree.insert(&item_b);
|
||||
let item_c = MockItem::new_str(55, 56, "c");
|
||||
tree.insert(&item_c);
|
||||
let item_d = MockItem::new_str(54, 56, "d");
|
||||
tree.insert(&item_d);
|
||||
let item_e = MockItem::new_str(55, 57, "e");
|
||||
tree.insert(&item_e);
|
||||
|
||||
dump_tree(&tree);
|
||||
|
||||
@@ -461,8 +460,8 @@ mod tests {
|
||||
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
|
||||
|
||||
// Inserting the same item twice is not cool
|
||||
let item = Arc::new(MockItem::new(1, 2));
|
||||
tree.insert(Arc::clone(&item));
|
||||
tree.insert(Arc::clone(&item)); // fails assertion
|
||||
let item = MockItem::new(1, 2);
|
||||
tree.insert(&item);
|
||||
tree.insert(&item); // fails assertion
|
||||
}
|
||||
}
|
||||
|
||||
129
pageserver/src/layered_repository/jobs.rs
Normal file
129
pageserver/src/layered_repository/jobs.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
use crate::tenant_mgr;
|
||||
use crate::layered_repository::layer_map;
|
||||
use crate::PageServerConf;
|
||||
|
||||
use anyhow::Result;
|
||||
use lazy_static::lazy_static;
|
||||
use tracing::*;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Mutex;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
lazy_static! {
|
||||
static ref JOB_QUEUE: Mutex<GlobalJobQueue> = Mutex::new(GlobalJobQueue::default());
|
||||
}
|
||||
|
||||
|
||||
#[derive(Default)]
|
||||
struct GlobalJobQueue {
|
||||
jobs: VecDeque<GlobalJob>,
|
||||
}
|
||||
|
||||
pub enum GlobalJob {
|
||||
// To release memory
|
||||
EvictSomeLayer,
|
||||
|
||||
// To advance 'disk_consistent_lsn'
|
||||
CheckpointTimeline(ZTenantId, ZTimelineId),
|
||||
|
||||
// To free up disk space
|
||||
GarbageCollect(ZTenantId),
|
||||
}
|
||||
|
||||
pub fn schedule_job(job: GlobalJob) {
|
||||
let mut queue = JOB_QUEUE.lock().unwrap();
|
||||
queue.jobs.push_back(job);
|
||||
}
|
||||
|
||||
///
|
||||
/// Launch the global job handler thread
|
||||
///
|
||||
/// TODO: This ought to be a pool of threads
|
||||
///
|
||||
pub fn launch_global_job_thread(conf: &'static PageServerConf) -> JoinHandle<()> {
|
||||
std::thread::Builder::new()
|
||||
.name("Global Job thread".into())
|
||||
.spawn(move || {
|
||||
// FIXME: relaunch it? Panic is not good.
|
||||
|
||||
global_job_loop(conf).expect("Global job thread died");
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn global_job_loop(conf: &'static PageServerConf) -> Result<()> {
|
||||
while !tenant_mgr::shutdown_requested() {
|
||||
std::thread::sleep(conf.checkpoint_period);
|
||||
info!("global job thread waking up");
|
||||
|
||||
let mut queue = JOB_QUEUE.lock().unwrap();
|
||||
while let Some(job) = queue.jobs.pop_front() {
|
||||
drop(queue);
|
||||
|
||||
let result = match job {
|
||||
GlobalJob::EvictSomeLayer => {
|
||||
evict_layer()
|
||||
},
|
||||
GlobalJob::CheckpointTimeline(tenantid, timelineid) => {
|
||||
checkpoint_timeline(tenantid, timelineid)
|
||||
}
|
||||
GlobalJob::GarbageCollect(tenantid) => {
|
||||
gc_tenant(tenantid)
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = result {
|
||||
error!("job ended in error: {:#}", err);
|
||||
}
|
||||
|
||||
queue = JOB_QUEUE.lock().unwrap();
|
||||
}
|
||||
}
|
||||
trace!("Checkpointer thread shut down");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Freeze and write out an in-memory layer
|
||||
fn evict_layer() -> Result<()>
|
||||
{
|
||||
// Pick a victim
|
||||
while let Some(layer_id) = layer_map::find_victim() {
|
||||
let victim_layer = match layer_map::get_layer(layer_id) {
|
||||
Some(l) => l,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let tenantid = victim_layer.get_tenant_id();
|
||||
let timelineid = victim_layer.get_timeline_id();
|
||||
|
||||
let _entered = info_span!("global evict", timeline = %timelineid, tenant = %tenantid)
|
||||
.entered();
|
||||
|
||||
info!("evicting {}", victim_layer.filename().display());
|
||||
|
||||
drop(victim_layer);
|
||||
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
|
||||
|
||||
timeline.upgrade_to_layered_timeline().evict_layer(layer_id)?
|
||||
}
|
||||
info!("no more eviction needed");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn checkpoint_timeline(tenantid: ZTenantId, timelineid: ZTimelineId) -> Result<()> {
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
|
||||
|
||||
timeline.checkpoint_scheduled()
|
||||
}
|
||||
|
||||
fn gc_tenant(tenantid: ZTenantId) -> Result<()> {
|
||||
let tenant = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
|
||||
tenant.gc_scheduled()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -9,6 +9,23 @@
|
||||
//! new image and delta layers and corresponding files are written to disk.
|
||||
//!
|
||||
|
||||
|
||||
|
||||
//
|
||||
// Global layer registry:
|
||||
//
|
||||
// Every layer is inserted into the global registry, and assigned an ID
|
||||
//
|
||||
// The global registry tracks memory usage and usage count for each layer
|
||||
//
|
||||
//
|
||||
// In addition to that, there is a per-timeline LayerMap, used for lookups
|
||||
//
|
||||
//
|
||||
|
||||
|
||||
|
||||
use crate::layered_repository::{schedule_job, GlobalJob};
|
||||
use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree};
|
||||
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
|
||||
use crate::layered_repository::InMemoryLayer;
|
||||
@@ -17,7 +34,8 @@ use anyhow::Result;
|
||||
use lazy_static::lazy_static;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tracing::*;
|
||||
use zenith_metrics::{register_int_gauge, IntGauge};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
@@ -28,6 +46,176 @@ lazy_static! {
|
||||
static ref NUM_ONDISK_LAYERS: IntGauge =
|
||||
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
|
||||
.expect("failed to define a metric");
|
||||
|
||||
// Global layer map
|
||||
static ref LAYERS: Mutex<GlobalLayerMap> = Mutex::new(GlobalLayerMap::new());
|
||||
}
|
||||
|
||||
const MAX_OPEN_LAYERS: usize = 10;
|
||||
|
||||
const MAX_LOADED_LAYERS: usize = 100;
|
||||
|
||||
struct GlobalLayerEntry {
|
||||
tag: u64, // to fix ABA problem
|
||||
layer: Option<Arc<dyn Layer>>,
|
||||
usage_count: u32,
|
||||
}
|
||||
|
||||
struct GlobalLayerMap {
|
||||
open_layers: Vec<GlobalLayerEntry>,
|
||||
clock_arm: u32,
|
||||
|
||||
num_open_layers: usize,
|
||||
eviction_scheduled: bool,
|
||||
|
||||
historic_layers: Vec<GlobalLayerEntry>,
|
||||
}
|
||||
|
||||
impl GlobalLayerMap {
|
||||
pub fn new() -> GlobalLayerMap {
|
||||
GlobalLayerMap {
|
||||
open_layers: Vec::new(),
|
||||
clock_arm: 0,
|
||||
historic_layers: Vec::new(),
|
||||
eviction_scheduled: false,
|
||||
num_open_layers: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
|
||||
let e = if layer_id.is_historic() {
|
||||
let idx = (layer_id.index - 1) as usize;
|
||||
&mut self.historic_layers[idx]
|
||||
} else {
|
||||
let idx = ((-layer_id.index) - 1) as usize;
|
||||
&mut self.open_layers[idx]
|
||||
};
|
||||
if e.usage_count < 5 {
|
||||
e.usage_count += 1;
|
||||
}
|
||||
|
||||
e.layer.clone()
|
||||
}
|
||||
|
||||
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
|
||||
let index = -(self.open_layers.len() as isize + 1);
|
||||
|
||||
let entry = GlobalLayerEntry {
|
||||
layer: Some(layer),
|
||||
usage_count: 1,
|
||||
tag: 1,
|
||||
};
|
||||
let tag = entry.tag;
|
||||
self.open_layers.push(entry);
|
||||
self.num_open_layers += 1;
|
||||
|
||||
NUM_INMEMORY_LAYERS.inc();
|
||||
|
||||
if !self.eviction_scheduled && self.num_open_layers >= MAX_OPEN_LAYERS {
|
||||
info!("scheduling global eviction");
|
||||
schedule_job(GlobalJob::EvictSomeLayer);
|
||||
self.eviction_scheduled = true;
|
||||
}
|
||||
|
||||
LayerId {
|
||||
index,
|
||||
tag,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
|
||||
let index = self.historic_layers.len() as isize + 1;
|
||||
|
||||
let entry = GlobalLayerEntry {
|
||||
layer: Some(layer),
|
||||
usage_count: 1,
|
||||
tag: 1,
|
||||
};
|
||||
let tag = entry.tag;
|
||||
self.historic_layers.push(entry);
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
|
||||
LayerId {
|
||||
index,
|
||||
tag,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
|
||||
let old_layer;
|
||||
|
||||
if layer_id.is_historic() {
|
||||
let idx = (layer_id.index - 1) as usize;
|
||||
old_layer = self.historic_layers[idx].layer.take();
|
||||
if old_layer.is_some() {
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
}
|
||||
} else {
|
||||
let idx = ((-layer_id.index) - 1) as usize;
|
||||
old_layer = self.open_layers[idx].layer.take();
|
||||
if old_layer.is_some() {
|
||||
NUM_INMEMORY_LAYERS.dec();
|
||||
self.num_open_layers -= 1;
|
||||
}
|
||||
}
|
||||
old_layer
|
||||
}
|
||||
|
||||
pub fn find_victim(&mut self) -> Option<LayerId> {
|
||||
// run the clock algorithm among all open layers
|
||||
for _ in 0..self.open_layers.len() * 5 {
|
||||
self.clock_arm += 1;
|
||||
if self.clock_arm >= self.open_layers.len() as u32 {
|
||||
self.clock_arm = 0;
|
||||
}
|
||||
let next = self.clock_arm as usize;
|
||||
|
||||
if self.open_layers[next].usage_count == 0 {
|
||||
return Some(LayerId {
|
||||
index: -((next + 1) as isize),
|
||||
tag: self.open_layers[next].tag,
|
||||
});
|
||||
} else {
|
||||
self.open_layers[next].usage_count -= 1;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_victim() -> Option<LayerId> {
|
||||
let mut l = LAYERS.lock().unwrap();
|
||||
|
||||
if l.num_open_layers >= MAX_OPEN_LAYERS {
|
||||
if let Some(x) = l.find_victim() {
|
||||
info!("found victim out of {} open layers", l.num_open_layers);
|
||||
Some(x)
|
||||
} else {
|
||||
info!("no victim found at {} open layers", l.num_open_layers);
|
||||
None
|
||||
}
|
||||
} else {
|
||||
info!("no victim needed at {} open layers", l.num_open_layers);
|
||||
l.eviction_scheduled = false;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_layer(layer_id: LayerId) -> Option<Arc<dyn Layer>> {
|
||||
LAYERS.lock().unwrap().get(layer_id)
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq)]
|
||||
pub struct LayerId {
|
||||
index: isize,
|
||||
tag: u64
|
||||
}
|
||||
|
||||
impl LayerId {
|
||||
pub fn is_historic(&self) -> bool {
|
||||
self.index > 0
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -41,7 +229,7 @@ pub struct LayerMap {
|
||||
/// All in-memory layers, ordered by 'oldest_pending_lsn' and generation
|
||||
/// of each layer. This allows easy access to the in-memory layer that
|
||||
/// contains the oldest WAL record.
|
||||
open_layers: BinaryHeap<OpenLayerEntry>,
|
||||
open_layers: BinaryHeap<OpenLayerHeapEntry>,
|
||||
|
||||
/// Generation number, used to distinguish newly inserted entries in the
|
||||
/// binary heap from older entries during checkpoint.
|
||||
@@ -61,23 +249,32 @@ impl LayerMap {
|
||||
segentry.get(lsn)
|
||||
}
|
||||
|
||||
pub fn get_with_id(&self, layer_id: LayerId) -> Option<Arc<dyn Layer>> {
|
||||
// TODO: check that it belongs to this tenant+timeline
|
||||
LAYERS.lock().unwrap().get(layer_id)
|
||||
}
|
||||
|
||||
///
|
||||
/// Get the open layer for given segment for writing. Or None if no open
|
||||
/// layer exists.
|
||||
///
|
||||
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
|
||||
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<dyn Layer>> {
|
||||
let segentry = self.segs.get(tag)?;
|
||||
|
||||
segentry.open.as_ref().map(Arc::clone)
|
||||
let (layer_id, _start_lsn) = segentry.open?;
|
||||
LAYERS.lock().unwrap().get(layer_id)
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert an open in-memory layer
|
||||
///
|
||||
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
|
||||
|
||||
let layer_id = LAYERS.lock().unwrap().insert_open(Arc::clone(&layer));
|
||||
|
||||
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
|
||||
|
||||
segentry.update_open(Arc::clone(&layer));
|
||||
segentry.update_open(layer_id, layer.get_start_lsn());
|
||||
|
||||
let oldest_pending_lsn = layer.get_oldest_pending_lsn();
|
||||
|
||||
@@ -87,58 +284,52 @@ impl LayerMap {
|
||||
assert!(oldest_pending_lsn.is_aligned());
|
||||
|
||||
// Also add it to the binary heap
|
||||
let open_layer_entry = OpenLayerEntry {
|
||||
let open_layer_entry = OpenLayerHeapEntry {
|
||||
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
|
||||
layer,
|
||||
layer_id,
|
||||
generation: self.current_generation,
|
||||
};
|
||||
self.open_layers.push(open_layer_entry);
|
||||
|
||||
NUM_INMEMORY_LAYERS.inc();
|
||||
}
|
||||
|
||||
/// Remove the oldest in-memory layer
|
||||
pub fn pop_oldest_open(&mut self) {
|
||||
// Pop it from the binary heap
|
||||
let oldest_entry = self.open_layers.pop().unwrap();
|
||||
let segtag = oldest_entry.layer.get_seg_tag();
|
||||
/// Remove a layer
|
||||
pub fn remove(&mut self, layer_id: LayerId) {
|
||||
if let Some(layer) = LAYERS.lock().unwrap().remove(layer_id) {
|
||||
// Also remove it from the SegEntry of this segment
|
||||
if layer_id.is_historic() {
|
||||
let tag = layer.get_seg_tag();
|
||||
|
||||
// Also remove it from the SegEntry of this segment
|
||||
let mut segentry = self.segs.get_mut(&segtag).unwrap();
|
||||
if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) {
|
||||
segentry.open = None;
|
||||
} else {
|
||||
// We could have already updated segentry.open for
|
||||
// dropped (non-writeable) layer. This is fine.
|
||||
assert!(!oldest_entry.layer.is_writeable());
|
||||
assert!(oldest_entry.layer.is_dropped());
|
||||
if let Some(segentry) = self.segs.get_mut(&tag) {
|
||||
segentry.historic.remove(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
|
||||
}
|
||||
} else {
|
||||
let segtag = layer.get_seg_tag();
|
||||
let mut segentry = self.segs.get_mut(&segtag).unwrap();
|
||||
if let Some(open) = segentry.open {
|
||||
if open.0 == layer_id {
|
||||
segentry.open = None;
|
||||
}
|
||||
} else {
|
||||
// We could have already updated segentry.open for
|
||||
// dropped (non-writeable) layer. This is fine.
|
||||
//assert!(!layer.is_writeable());
|
||||
//assert!(layer.is_dropped());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
NUM_INMEMORY_LAYERS.dec();
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert an on-disk layer
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
|
||||
|
||||
let layer_id = LAYERS.lock().unwrap().insert_historic(Arc::clone(&layer));
|
||||
|
||||
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
|
||||
segentry.insert_historic(layer);
|
||||
segentry.insert_historic(layer_id, layer);
|
||||
|
||||
NUM_ONDISK_LAYERS.inc();
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove an on-disk layer from the map.
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
let tag = layer.get_seg_tag();
|
||||
|
||||
if let Some(segentry) = self.segs.get_mut(&tag) {
|
||||
segentry.historic.remove(&layer);
|
||||
}
|
||||
NUM_ONDISK_LAYERS.dec();
|
||||
layer_id
|
||||
}
|
||||
|
||||
// List relations along with a flag that marks if they exist at the given lsn.
|
||||
@@ -199,10 +390,18 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
/// Return the oldest in-memory layer, along with its generation number.
|
||||
pub fn peek_oldest_open(&self) -> Option<(Arc<InMemoryLayer>, u64)> {
|
||||
self.open_layers
|
||||
.peek()
|
||||
.map(|oldest_entry| (Arc::clone(&oldest_entry.layer), oldest_entry.generation))
|
||||
pub fn peek_oldest_open(&mut self) -> Option<(LayerId, Arc<dyn Layer>, u64)> {
|
||||
|
||||
while let Some(oldest_entry) = self.open_layers.peek() {
|
||||
if let Some(layer) = LAYERS.lock().unwrap().get(oldest_entry.layer_id) {
|
||||
return Some((oldest_entry.layer_id,
|
||||
layer,
|
||||
oldest_entry.generation));
|
||||
} else {
|
||||
self.open_layers.pop();
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Increment the generation number used to stamp open in-memory layers. Layers
|
||||
@@ -220,6 +419,7 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
/// debugging function to print out the contents of the layer map
|
||||
#[allow(unused)]
|
||||
pub fn dump(&self) -> Result<()> {
|
||||
@@ -236,16 +436,39 @@ impl LayerMap {
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl IntervalItem for dyn Layer {
|
||||
#[derive(Clone)]
|
||||
struct HistoricLayerIntervalTreeEntry {
|
||||
layer_id: LayerId,
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl HistoricLayerIntervalTreeEntry {
|
||||
fn new(layer_id: LayerId, layer: Arc<dyn Layer>) -> HistoricLayerIntervalTreeEntry{
|
||||
HistoricLayerIntervalTreeEntry {
|
||||
layer_id,
|
||||
start_lsn: layer.get_start_lsn(),
|
||||
end_lsn: layer.get_end_lsn(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for HistoricLayerIntervalTreeEntry {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.layer_id == other.layer_id
|
||||
}
|
||||
}
|
||||
impl IntervalItem for HistoricLayerIntervalTreeEntry {
|
||||
type Key = Lsn;
|
||||
|
||||
fn start_key(&self) -> Lsn {
|
||||
self.get_start_lsn()
|
||||
self.start_lsn
|
||||
}
|
||||
fn end_key(&self) -> Lsn {
|
||||
self.get_end_lsn()
|
||||
self.end_lsn
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,8 +482,8 @@ impl IntervalItem for dyn Layer {
|
||||
/// IntervalTree.
|
||||
#[derive(Default)]
|
||||
struct SegEntry {
|
||||
open: Option<Arc<InMemoryLayer>>,
|
||||
historic: IntervalTree<dyn Layer>,
|
||||
open: Option<(LayerId, Lsn)>,
|
||||
historic: IntervalTree<HistoricLayerIntervalTreeEntry>,
|
||||
}
|
||||
|
||||
impl SegEntry {
|
||||
@@ -276,13 +499,18 @@ impl SegEntry {
|
||||
|
||||
pub fn get(&self, lsn: Lsn) -> Option<Arc<dyn Layer>> {
|
||||
if let Some(open) = &self.open {
|
||||
if open.get_start_lsn() <= lsn {
|
||||
let x: Arc<dyn Layer> = Arc::clone(open) as _;
|
||||
return Some(x);
|
||||
if let Some(layer) = LAYERS.lock().unwrap().get(open.0) {
|
||||
if layer.get_start_lsn() <= lsn {
|
||||
return Some(layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.historic.search(lsn)
|
||||
if let Some(historic) = self.historic.search(lsn) {
|
||||
Some(LAYERS.lock().unwrap().get(historic.layer_id).unwrap())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn newer_image_layer_exists(&self, lsn: Lsn) -> bool {
|
||||
@@ -291,21 +519,25 @@ impl SegEntry {
|
||||
|
||||
self.historic
|
||||
.iter_newer(lsn)
|
||||
.any(|layer| !layer.is_incremental())
|
||||
.any(|e| {
|
||||
let layer = LAYERS.lock().unwrap().get(e.layer_id).unwrap();
|
||||
!layer.is_incremental()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
// Set new open layer for a SegEntry.
|
||||
// It's ok to rewrite previous open layer,
|
||||
// but only if it is not writeable anymore.
|
||||
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) {
|
||||
if let Some(prev_open) = &self.open {
|
||||
assert!(!prev_open.is_writeable());
|
||||
pub fn update_open(&mut self, layer_id: LayerId, start_lsn: Lsn) {
|
||||
if let Some(_prev_open) = &self.open {
|
||||
//assert!(!prev_open.is_writeable());
|
||||
}
|
||||
self.open = Some(layer);
|
||||
self.open = Some((layer_id, start_lsn));
|
||||
}
|
||||
|
||||
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
self.historic.insert(layer);
|
||||
pub fn insert_historic(&mut self, layer_id: LayerId, layer: Arc<dyn Layer>) {
|
||||
self.historic.insert(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,12 +547,12 @@ impl SegEntry {
|
||||
/// The generation number associated with each entry can be used to distinguish
|
||||
/// recently-added entries (i.e after last call to increment_generation()) from older
|
||||
/// entries with the same 'oldest_pending_lsn'.
|
||||
struct OpenLayerEntry {
|
||||
struct OpenLayerHeapEntry {
|
||||
pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
|
||||
pub generation: u64,
|
||||
pub layer: Arc<InMemoryLayer>,
|
||||
pub layer_id: LayerId,
|
||||
}
|
||||
impl Ord for OpenLayerEntry {
|
||||
impl Ord for OpenLayerHeapEntry {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
|
||||
// to get that. Entries with identical oldest_pending_lsn are ordered by generation
|
||||
@@ -330,32 +562,33 @@ impl Ord for OpenLayerEntry {
|
||||
.then_with(|| other.generation.cmp(&self.generation))
|
||||
}
|
||||
}
|
||||
impl PartialOrd for OpenLayerEntry {
|
||||
impl PartialOrd for OpenLayerHeapEntry {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
impl PartialEq for OpenLayerEntry {
|
||||
impl PartialEq for OpenLayerHeapEntry {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.cmp(other) == Ordering::Equal
|
||||
}
|
||||
}
|
||||
impl Eq for OpenLayerEntry {}
|
||||
impl Eq for OpenLayerHeapEntry {}
|
||||
|
||||
/// Iterator returned by LayerMap::iter_historic_layers()
|
||||
pub struct HistoricLayerIter<'a> {
|
||||
seg_iter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
|
||||
iter: Option<IntervalIter<'a, dyn Layer>>,
|
||||
iter: Option<IntervalIter<'a, HistoricLayerIntervalTreeEntry>>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for HistoricLayerIter<'a> {
|
||||
type Item = Arc<dyn Layer>;
|
||||
type Item = (LayerId, Arc<dyn Layer>);
|
||||
|
||||
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
|
||||
loop {
|
||||
if let Some(x) = &mut self.iter {
|
||||
if let Some(x) = x.next() {
|
||||
return Some(Arc::clone(&x));
|
||||
let layer = LAYERS.lock().unwrap().get(x.layer_id).unwrap();
|
||||
return Some((x.layer_id, layer));
|
||||
}
|
||||
}
|
||||
if let Some((_tag, segentry)) = self.seg_iter.next() {
|
||||
@@ -426,10 +659,10 @@ mod tests {
|
||||
// A helper function (closure) to pop the next oldest open entry from the layer map,
|
||||
// and assert that it is what we'd expect
|
||||
let mut assert_pop_layer = |expected_segno: u32, expected_generation: u64| {
|
||||
let (l, generation) = layers.peek_oldest_open().unwrap();
|
||||
let (layer_id, l, generation) = layers.peek_oldest_open().unwrap();
|
||||
assert!(l.get_seg_tag().segno == expected_segno);
|
||||
assert!(generation == expected_generation);
|
||||
layers.pop_oldest_open();
|
||||
layers.remove(layer_id);
|
||||
};
|
||||
|
||||
assert_pop_layer(0, gen1); // 0x100
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
//! Common traits and structs for layers
|
||||
//!
|
||||
|
||||
use crate::layered_repository::InMemoryLayer;
|
||||
use crate::relish::RelishTag;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::ZTimelineId;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -104,6 +105,14 @@ pub enum PageReconstructResult {
|
||||
/// in-memory and on-disk layers.
|
||||
///
|
||||
pub trait Layer: Send + Sync {
|
||||
|
||||
fn upgrade_to_inmemory_layer(&self) -> Option<&InMemoryLayer> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Identify the timeline this relish belongs to
|
||||
fn get_tenant_id(&self) -> ZTenantId;
|
||||
|
||||
/// Identify the timeline this relish belongs to
|
||||
fn get_timeline_id(&self) -> ZTimelineId;
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ pub mod tenant_mgr;
|
||||
pub mod waldecoder;
|
||||
pub mod walreceiver;
|
||||
pub mod walredo;
|
||||
pub mod vfd;
|
||||
|
||||
pub mod defaults {
|
||||
use const_format::formatcp;
|
||||
|
||||
@@ -690,7 +690,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
|
||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
|
||||
let result = repo.gc_manual(Some(timelineid), gc_horizon, true)?;
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"layer_relfiles_total"),
|
||||
|
||||
@@ -33,12 +33,16 @@ pub trait Repository: Send + Sync {
|
||||
/// `checkpoint_before_gc` parameter is used to force compaction of storage before CG
|
||||
/// to make tests more deterministic.
|
||||
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
|
||||
fn gc_iteration(
|
||||
fn gc_manual(
|
||||
&self,
|
||||
timelineid: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
checkpoint_before_gc: bool,
|
||||
) -> Result<GcResult>;
|
||||
|
||||
fn gc_scheduled(&self) -> Result<GcResult>;
|
||||
|
||||
fn upgrade_to_layered_repository(&self) -> &crate::layered_repository::LayeredRepository;
|
||||
}
|
||||
|
||||
///
|
||||
@@ -144,7 +148,9 @@ pub trait Timeline: Send + Sync {
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint(&self) -> Result<()>;
|
||||
fn checkpoint_forced(&self) -> Result<()>;
|
||||
|
||||
fn checkpoint_scheduled(&self) -> Result<()>;
|
||||
|
||||
/// Retrieve current logical size of the timeline
|
||||
///
|
||||
@@ -155,6 +161,8 @@ pub trait Timeline: Send + Sync {
|
||||
/// Does the same as get_current_logical_size but counted on demand.
|
||||
/// Used in tests to ensure thet incremental and non incremental variants match.
|
||||
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
|
||||
|
||||
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline;
|
||||
}
|
||||
|
||||
/// Various functions to mutate the timeline.
|
||||
@@ -714,8 +722,8 @@ mod tests {
|
||||
.contains(&TESTREL_A));
|
||||
|
||||
// Run checkpoint and garbage collection and check that it's still not visible
|
||||
newtline.checkpoint()?;
|
||||
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
|
||||
newtline.checkpoint_forced()?;
|
||||
repo.gc_manual(Some(NEW_TIMELINE_ID), 0, true)?;
|
||||
|
||||
assert!(!newtline
|
||||
.list_rels(0, TESTDB, Lsn(0x40))?
|
||||
|
||||
@@ -106,17 +106,6 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
|
||||
true,
|
||||
));
|
||||
|
||||
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
|
||||
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
|
||||
|
||||
let mut handles = TENANT_HANDLES.lock().unwrap();
|
||||
let h = TenantHandleEntry {
|
||||
checkpointer_handle: Some(checkpointer_handle),
|
||||
gc_handle: Some(gc_handle),
|
||||
};
|
||||
|
||||
handles.insert(tenant_id, h);
|
||||
|
||||
let mut m = access_tenants();
|
||||
let tenant = m.get_mut(&tenant_id).unwrap();
|
||||
tenant.repo = Some(repo);
|
||||
|
||||
155
pageserver/src/vfd.rs
Normal file
155
pageserver/src/vfd.rs
Normal file
@@ -0,0 +1,155 @@
|
||||
use std::fs::File;
|
||||
use std::io::Seek;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
const INVALID_TAG: u64 = u64::MAX;
|
||||
|
||||
struct OpenFiles {
|
||||
next: usize,
|
||||
files: Vec<OpenFile>,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref OPEN_FILES: Mutex<OpenFiles> = Mutex::new(OpenFiles {
|
||||
next: 0,
|
||||
files: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
struct OpenFile {
|
||||
tag: u64,
|
||||
file: Option<File>,
|
||||
}
|
||||
|
||||
pub struct VirtualFile {
|
||||
vfd: usize,
|
||||
tag: u64,
|
||||
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
|
||||
pub fn new(path: &Path) -> VirtualFile {
|
||||
VirtualFile {
|
||||
vfd: 0,
|
||||
tag: INVALID_TAG,
|
||||
path: path.to_path_buf(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open(&mut self) -> std::io::Result<File> {
|
||||
|
||||
let mut l = OPEN_FILES.lock().unwrap();
|
||||
|
||||
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
|
||||
|
||||
if let Some(mut file) = l.files[self.vfd].file.take() {
|
||||
// return cached File
|
||||
file.rewind()?;
|
||||
return Ok(file);
|
||||
}
|
||||
}
|
||||
File::open(&self.path)
|
||||
}
|
||||
|
||||
pub fn cache(&mut self, file: File) {
|
||||
|
||||
let mut l = OPEN_FILES.lock().unwrap();
|
||||
|
||||
let next = if l.next >= l.files.len() {
|
||||
if l.files.len() < 100 {
|
||||
l.files.push(OpenFile {
|
||||
tag: 0,
|
||||
file: None
|
||||
});
|
||||
l.files.len() - 1
|
||||
} else {
|
||||
// wrap around
|
||||
0
|
||||
}
|
||||
} else {
|
||||
l.next
|
||||
};
|
||||
l.next = next + 1;
|
||||
|
||||
l.files[next].file.replace(file);
|
||||
l.files[next].tag += 1;
|
||||
|
||||
self.vfd = next;
|
||||
self.tag = l.files[next].tag;
|
||||
|
||||
drop(l);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
fn drop(&mut self) {
|
||||
|
||||
// Close file if it's still open
|
||||
|
||||
if self.tag != INVALID_TAG {
|
||||
let mut l = OPEN_FILES.lock().unwrap();
|
||||
|
||||
if self.vfd < l.files.len() && l.files[self.vfd].tag == self.tag {
|
||||
l.files[self.vfd].file.take();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::PageServerConf;
|
||||
use super::*;
|
||||
use std::io::Read;
|
||||
|
||||
#[test]
|
||||
fn test_vfd() -> anyhow::Result<()> {
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
|
||||
let test_dir = PageServerConf::test_repo_dir("test_vfd");
|
||||
let _ = std::fs::remove_dir_all(&test_dir);
|
||||
std::fs::create_dir_all(&test_dir)?;
|
||||
|
||||
for i in 0..2000 {
|
||||
let path = test_dir.join(format!("vfd_test{}", i));
|
||||
let content = format!("foobar{}", i);
|
||||
|
||||
std::fs::write(&path, &content)?;
|
||||
|
||||
let vfile = VirtualFile::new(&path);
|
||||
|
||||
vfiles.push((vfile, path, content));
|
||||
}
|
||||
|
||||
for i in 0..vfiles.len() {
|
||||
let (ref mut vfile, _path, expected_content) = &mut vfiles[i];
|
||||
let mut s = String::new();
|
||||
|
||||
let mut file = vfile.open()?;
|
||||
file.read_to_string(&mut s)?;
|
||||
|
||||
assert!(&s == expected_content);
|
||||
|
||||
vfile.cache(file);
|
||||
|
||||
s.clear();
|
||||
let (ref mut vfile, _path, expected_content) = &mut vfiles[0];
|
||||
let mut file = vfile.open()?;
|
||||
file.read_to_string(&mut s)?;
|
||||
|
||||
assert!(&s == expected_content);
|
||||
|
||||
vfile.cache(file);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -251,6 +251,9 @@ fn walreceiver_main(
|
||||
last_rec_lsn = lsn;
|
||||
}
|
||||
|
||||
timeline.upgrade_to_layered_timeline().schedule_checkpoint_if_needed()?;
|
||||
timeline.upgrade_to_layered_timeline().schedule_gc_if_needed()?;
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
info!("caught up at LSN {}", endlsn);
|
||||
caught_up = true;
|
||||
|
||||
@@ -28,19 +28,21 @@ use std::fs::OpenOptions;
|
||||
use std::io::prelude::*;
|
||||
use std::io::Error;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use std::process::{ChildStdin, ChildStdout, ChildStderr, Command};
|
||||
use std::process::Stdio;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::process::{ChildStdin, ChildStdout, Command};
|
||||
use tokio::time::timeout;
|
||||
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use nix::poll::*;
|
||||
|
||||
use crate::relish::*;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::waldecoder::XlMultiXactCreate;
|
||||
@@ -133,14 +135,14 @@ lazy_static! {
|
||||
/// perform WAL replay. Only one thread can use the processs at a time,
|
||||
/// that is controlled by the Mutex. In the future, we might want to
|
||||
/// launch a pool of processes to allow concurrent replay of multiple
|
||||
/// records.
|
||||
/// records. FIXME we have a pool now
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
runtime: tokio::runtime::Runtime,
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
processes: Vec<Mutex<Option<PostgresRedoProcess>>>,
|
||||
next: AtomicUsize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -210,21 +212,18 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
end_time = Instant::now();
|
||||
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
|
||||
} else {
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let process_no = self.next.fetch_add(1, Ordering::AcqRel) % self.processes.len();
|
||||
let mut process_guard = self.processes[process_no].lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = self
|
||||
.runtime
|
||||
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
|
||||
let p = PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
|
||||
result = self
|
||||
.runtime
|
||||
.block_on(self.handle_apply_request_postgres(process, &request));
|
||||
result = self.handle_apply_request_postgres(process, &request);
|
||||
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
end_time = Instant::now();
|
||||
@@ -240,27 +239,24 @@ impl PostgresRedoManager {
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
||||
// We block on waiting for requests on the walredo request channel, but
|
||||
// use async I/O to communicate with the child process. Initialize the
|
||||
// runtime for the async part.
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut processes: Vec<Mutex<Option<PostgresRedoProcess>>> = Vec::new();
|
||||
for _ in 1..10 {
|
||||
processes.push(Mutex::new(None));
|
||||
}
|
||||
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
runtime,
|
||||
tenantid,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
processes,
|
||||
next: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Process one request for WAL redo using wal-redo postgres
|
||||
///
|
||||
async fn handle_apply_request_postgres(
|
||||
fn handle_apply_request_postgres(
|
||||
&self,
|
||||
process: &mut PostgresRedoProcess,
|
||||
request: &WalRedoRequest,
|
||||
@@ -278,14 +274,14 @@ impl PostgresRedoManager {
|
||||
if let RelishTag::Relation(rel) = request.rel {
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
|
||||
apply_result = process.apply_wal_records(buf_tag, base_img, records);
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
debug!(
|
||||
"postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}",
|
||||
trace!(
|
||||
"postgres applied {} WAL records in {} us to reconstruct page image at LSN {}",
|
||||
nrecords,
|
||||
duration.as_millis(),
|
||||
duration.as_micros(),
|
||||
lsn
|
||||
);
|
||||
|
||||
@@ -471,20 +467,22 @@ impl PostgresRedoManager {
|
||||
struct PostgresRedoProcess {
|
||||
stdin: ChildStdin,
|
||||
stdout: ChildStdout,
|
||||
stderr: ChildStderr,
|
||||
}
|
||||
|
||||
impl PostgresRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
async fn launch(
|
||||
fn launch(
|
||||
conf: &PageServerConf,
|
||||
process_no: usize,
|
||||
tenantid: &ZTenantId,
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
|
||||
let datadir = conf.tenant_path(tenantid).join(format!("wal-redo-datadir-{}", process_no));
|
||||
|
||||
// Create empty data directory for wal-redo postgres, deleting old one first.
|
||||
if datadir.exists() {
|
||||
@@ -501,7 +499,6 @@ impl PostgresRedoProcess {
|
||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
||||
.output()
|
||||
.await
|
||||
.expect("failed to execute initdb");
|
||||
|
||||
if !initdb.status.success() {
|
||||
@@ -538,102 +535,106 @@ impl PostgresRedoProcess {
|
||||
datadir.display()
|
||||
);
|
||||
|
||||
let stdin = child.stdin.take().expect("failed to open child's stdin");
|
||||
let stderr = child.stderr.take().expect("failed to open child's stderr");
|
||||
let stdout = child.stdout.take().expect("failed to open child's stdout");
|
||||
|
||||
// This async block reads the child's stderr, and forwards it to the logger
|
||||
let f_stderr = async {
|
||||
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
|
||||
|
||||
let mut line = String::new();
|
||||
loop {
|
||||
let res = stderr_buffered.read_line(&mut line).await;
|
||||
if res.is_err() {
|
||||
debug!("could not convert line to utf-8");
|
||||
continue;
|
||||
}
|
||||
if res.unwrap() == 0 {
|
||||
break;
|
||||
}
|
||||
error!("wal-redo-postgres: {}", line.trim());
|
||||
line.clear();
|
||||
}
|
||||
Ok::<(), Error>(())
|
||||
};
|
||||
tokio::spawn(f_stderr);
|
||||
|
||||
Ok(PostgresRedoProcess { stdin, stdout })
|
||||
let stdin = child.stdin.take().unwrap();
|
||||
let stdout = child.stdout.take().unwrap();
|
||||
let stderr = child.stderr.take().unwrap();
|
||||
Ok(PostgresRedoProcess { stdin, stdout, stderr })
|
||||
}
|
||||
|
||||
//
|
||||
// Apply given WAL records ('records') over an old page image. Returns
|
||||
// new page image.
|
||||
//
|
||||
async fn apply_wal_records(
|
||||
fn apply_wal_records(
|
||||
&mut self,
|
||||
tag: BufferTag,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, WALRecord)],
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
let stdout = &mut self.stdout;
|
||||
// Buffer the writes to avoid a lot of small syscalls.
|
||||
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
|
||||
|
||||
// We do three things simultaneously: send the old base image and WAL records to
|
||||
// the child process's stdin, read the result from child's stdout, and forward any logging
|
||||
// information that the child writes to its stderr to the page server's log.
|
||||
//
|
||||
// 'f_stdin' handles writing the base image and WAL records to the child process.
|
||||
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
|
||||
// tokio runtime in the 'launch' function already, forwards the logging.
|
||||
let f_stdin = async {
|
||||
// Send base image, if any. (If the record initializes the page, previous page
|
||||
// version is not needed.)
|
||||
timeout(
|
||||
TIMEOUT,
|
||||
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
|
||||
)
|
||||
.await??;
|
||||
if let Some(img) = base_img {
|
||||
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??;
|
||||
// Send base image, if any. (If the record initializes the page, previous page
|
||||
// version is not needed.)
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
build_begin_redo_for_block_msg(tag, &mut buf);
|
||||
if let Some(img) = base_img {
|
||||
build_push_page_msg(tag, &img, &mut buf);
|
||||
}
|
||||
|
||||
// Send WAL records.
|
||||
for (lsn, rec) in records.iter() {
|
||||
WAL_REDO_RECORD_COUNTER.inc();
|
||||
|
||||
build_apply_record_msg(*lsn, &rec.rec, &mut buf);
|
||||
|
||||
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
|
||||
// r.lsn >> 32, r.lsn & 0xffff_ffff);
|
||||
}
|
||||
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
|
||||
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
|
||||
|
||||
// Send GetPage command to get the result back
|
||||
build_get_page_msg(tag, &mut buf);
|
||||
|
||||
|
||||
// The input is now in 'buf'.
|
||||
|
||||
let mut nwrite = 0;
|
||||
|
||||
let mut resultbuf = Vec::new();
|
||||
resultbuf.resize(8192, 0);
|
||||
|
||||
let mut nresult = 0;
|
||||
|
||||
let mut pollfds = [
|
||||
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
|
||||
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
|
||||
];
|
||||
|
||||
// Do a blind write first
|
||||
let n = self.stdin.write(&buf[nwrite..])?;
|
||||
nwrite += n;
|
||||
|
||||
while nresult < 8192 {
|
||||
|
||||
let nfds = if nwrite < buf.len() {
|
||||
3
|
||||
} else {
|
||||
2
|
||||
};
|
||||
nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
|
||||
|
||||
// We do three things simultaneously: send the old base image and WAL records to
|
||||
// the child process's stdin, read the result from child's stdout, and forward any logging
|
||||
// information that the child writes to its stderr to the page server's log.
|
||||
//
|
||||
// 'f_stdin' handles writing the base image and WAL records to the child process.
|
||||
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
|
||||
// tokio runtime in the 'launch' function already, forwards the logging.
|
||||
if nwrite < buf.len() && !pollfds[2].revents().unwrap().is_empty() {
|
||||
// stdin
|
||||
let n = self.stdin.write(&buf[nwrite..])?;
|
||||
nwrite += n;
|
||||
}
|
||||
if !pollfds[0].revents().unwrap().is_empty() {
|
||||
// stdout
|
||||
// Read back new page image
|
||||
let n = self.stdout.read(&mut resultbuf[nresult..])?;
|
||||
|
||||
// Send WAL records.
|
||||
for (lsn, rec) in records.iter() {
|
||||
WAL_REDO_RECORD_COUNTER.inc();
|
||||
|
||||
stdin
|
||||
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
|
||||
.await?;
|
||||
|
||||
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
|
||||
// r.lsn >> 32, r.lsn & 0xffff_ffff);
|
||||
nresult += n;
|
||||
}
|
||||
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
|
||||
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
|
||||
if !pollfds[1].revents().unwrap().is_empty() {
|
||||
// stderr
|
||||
let mut readbuf: [u8; 16384] = [0; 16384];
|
||||
|
||||
// Send GetPage command to get the result back
|
||||
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
|
||||
timeout(TIMEOUT, stdin.flush()).await??;
|
||||
let n = self.stderr.read(&mut readbuf)?;
|
||||
|
||||
error!("wal-redo-postgres: {}", String::from_utf8_lossy(&readbuf[0..n]));
|
||||
}
|
||||
//debug!("sent GetPage for {}", tag.blknum);
|
||||
Ok::<(), Error>(())
|
||||
};
|
||||
}
|
||||
|
||||
// Read back new page image
|
||||
let f_stdout = async {
|
||||
let mut buf = [0u8; 8192];
|
||||
|
||||
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
|
||||
//debug!("got response for {}", tag.blknum);
|
||||
Ok::<[u8; 8192], Error>(buf)
|
||||
};
|
||||
|
||||
let res = tokio::try_join!(f_stdout, f_stdin)?;
|
||||
|
||||
let buf = res.0;
|
||||
|
||||
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
|
||||
Ok(Bytes::from(Vec::from(resultbuf)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -641,62 +642,50 @@ impl PostgresRedoProcess {
|
||||
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
|
||||
// explanation of the protocol.
|
||||
|
||||
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> {
|
||||
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'B');
|
||||
buf.put_u32(len as u32);
|
||||
|
||||
tag.ser_into(&mut buf)
|
||||
tag.ser_into(buf)
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf
|
||||
//debug_assert!(buf.len() == 1 + len);
|
||||
}
|
||||
|
||||
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> {
|
||||
fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
|
||||
assert!(base_img.len() == 8192);
|
||||
|
||||
let len = 4 + 1 + 4 * 4 + base_img.len();
|
||||
let mut buf = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'P');
|
||||
buf.put_u32(len as u32);
|
||||
tag.ser_into(&mut buf)
|
||||
tag.ser_into(buf)
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
buf.put(base_img);
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf
|
||||
//debug_assert!(buf.len() - oldlen == 1 + len);
|
||||
}
|
||||
|
||||
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
|
||||
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
|
||||
let len = 4 + 8 + rec.len();
|
||||
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'A');
|
||||
buf.put_u32(len as u32);
|
||||
buf.put_u64(endlsn.0);
|
||||
buf.put(rec);
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf
|
||||
//debug_assert!(buf.len() - oldlen == 1 + len);
|
||||
}
|
||||
|
||||
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> {
|
||||
fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = Vec::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'G');
|
||||
buf.put_u32(len as u32);
|
||||
tag.ser_into(&mut buf)
|
||||
tag.ser_into(buf)
|
||||
.expect("serialize BufferTag should always succeed");
|
||||
|
||||
debug_assert!(buf.len() == 1 + len);
|
||||
|
||||
buf
|
||||
//debug_assert!(buf.len() == 1 + len);
|
||||
}
|
||||
|
||||
@@ -203,96 +203,38 @@ fn main() -> Result<()> {
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
// Create config file
|
||||
if let ("init", Some(init_match)) = matches.subcommand() {
|
||||
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
|
||||
// load and parse the file
|
||||
std::fs::read_to_string(std::path::Path::new(config_path))
|
||||
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))?
|
||||
} else {
|
||||
// Built-in default config
|
||||
default_conf()
|
||||
let (sub_name, sub_args) = matches.subcommand();
|
||||
let sub_args = sub_args.expect("no subcommand");
|
||||
|
||||
// Check for 'zenith init' command first.
|
||||
let subcmd_result = if sub_name == "init" {
|
||||
handle_init(sub_args)
|
||||
} else {
|
||||
// all other commands need an existing config
|
||||
let env = match LocalEnv::load_config() {
|
||||
Ok(conf) => conf,
|
||||
Err(e) => {
|
||||
eprintln!("Error loading config: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let mut env = LocalEnv::create_config(&toml_file)
|
||||
.with_context(|| "Failed to create zenith configuration")?;
|
||||
env.init()
|
||||
.with_context(|| "Failed to initialize zenith repository")?;
|
||||
match sub_name {
|
||||
"tenant" => handle_tenant(sub_args, &env),
|
||||
"branch" => handle_branch(sub_args, &env),
|
||||
"start" => handle_start_all(sub_args, &env),
|
||||
"stop" => handle_stop_all(sub_args, &env),
|
||||
"pageserver" => handle_pageserver(sub_args, &env),
|
||||
"pg" => handle_pg(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
_ => bail!("unexpected subcommand {}", sub_name),
|
||||
}
|
||||
};
|
||||
if let Err(e) = subcmd_result {
|
||||
eprintln!("command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// all other commands would need config
|
||||
let env = match LocalEnv::load_config() {
|
||||
Ok(conf) => conf,
|
||||
Err(e) => {
|
||||
eprintln!("Error loading config: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
match matches.subcommand() {
|
||||
("init", Some(_sub_m)) => {
|
||||
// The options were handled above already
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
if let Err(e) = pageserver.init(
|
||||
// default_tenantid was generated by the `env.init()` call above
|
||||
Some(&env.default_tenantid.unwrap().to_string()),
|
||||
) {
|
||||
eprintln!("pageserver init failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
("tenant", Some(args)) => {
|
||||
if let Err(e) = handle_tenant(args, &env) {
|
||||
eprintln!("tenant command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("branch", Some(sub_args)) => {
|
||||
if let Err(e) = handle_branch(sub_args, &env) {
|
||||
eprintln!("branch command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("start", Some(sub_match)) => {
|
||||
if let Err(e) = handle_start_all(sub_match, &env) {
|
||||
eprintln!("start command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("stop", Some(sub_match)) => {
|
||||
if let Err(e) = handle_stop_all(sub_match, &env) {
|
||||
eprintln!("stop command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("pageserver", Some(sub_match)) => {
|
||||
if let Err(e) = handle_pageserver(sub_match, &env) {
|
||||
eprintln!("pageserver command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("pg", Some(pg_match)) => {
|
||||
if let Err(e) = handle_pg(pg_match, &env) {
|
||||
eprintln!("pg operation failed: {:?}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("safekeeper", Some(sub_match)) => {
|
||||
if let Err(e) = handle_safekeeper(sub_match, &env) {
|
||||
eprintln!("safekeeper command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -438,6 +380,35 @@ fn get_tenantid(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<ZTe
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_init(init_match: &ArgMatches) -> Result<()> {
|
||||
// Create config file
|
||||
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
|
||||
// load and parse the file
|
||||
std::fs::read_to_string(std::path::Path::new(config_path))
|
||||
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))?
|
||||
} else {
|
||||
// Built-in default config
|
||||
default_conf()
|
||||
};
|
||||
|
||||
let mut env = LocalEnv::create_config(&toml_file)
|
||||
.with_context(|| "Failed to create zenith configuration")?;
|
||||
env.init()
|
||||
.with_context(|| "Failed to initialize zenith repository")?;
|
||||
|
||||
// Call 'pageserver init'.
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
if let Err(e) = pageserver.init(
|
||||
// default_tenantid was generated by the `env.init()` call above
|
||||
Some(&env.default_tenantid.unwrap().to_string()),
|
||||
) {
|
||||
eprintln!("pageserver init failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_tenant(tenant_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
match tenant_match.subcommand() {
|
||||
@@ -486,10 +457,11 @@ fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
|
||||
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
|
||||
match pg_match.subcommand() {
|
||||
("list", Some(list_match)) => {
|
||||
let tenantid = get_tenantid(list_match, env)?;
|
||||
// All subcommands take an optional --tenantid option
|
||||
let tenantid = get_tenantid(pg_match, env)?;
|
||||
|
||||
match pg_match.subcommand() {
|
||||
("list", Some(_list_match)) => {
|
||||
let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| {
|
||||
eprintln!("Failed to load branch info: {}", e);
|
||||
HashMap::new()
|
||||
@@ -520,7 +492,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
}
|
||||
}
|
||||
("create", Some(create_match)) => {
|
||||
let tenantid = get_tenantid(create_match, env)?;
|
||||
let node_name = create_match.value_of("node").unwrap_or("main");
|
||||
let timeline_name = create_match.value_of("timeline").unwrap_or(node_name);
|
||||
|
||||
@@ -531,7 +502,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
cplane.new_node(tenantid, node_name, timeline_name, port)?;
|
||||
}
|
||||
("start", Some(start_match)) => {
|
||||
let tenantid = get_tenantid(start_match, env)?;
|
||||
let node_name = start_match.value_of("node").unwrap_or("main");
|
||||
let timeline_name = start_match.value_of("timeline");
|
||||
|
||||
@@ -572,7 +542,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
}
|
||||
}
|
||||
("stop", Some(stop_match)) => {
|
||||
let tenantid = get_tenantid(stop_match, env)?;
|
||||
let node_name = stop_match.value_of("node").unwrap_or("main");
|
||||
let destroy = stop_match.is_present("destroy");
|
||||
|
||||
@@ -636,26 +605,23 @@ fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNod
|
||||
}
|
||||
|
||||
fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
match sub_match.subcommand() {
|
||||
("start", Some(sub_match)) => {
|
||||
let node_name = sub_match
|
||||
.value_of("node")
|
||||
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
let (sub_name, sub_args) = sub_match.subcommand();
|
||||
let sub_args = sub_args.expect("no safekeeper subcommand");
|
||||
|
||||
// All the commands take an optional safekeeper name argument
|
||||
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
|
||||
match sub_name {
|
||||
"start" => {
|
||||
if let Err(e) = safekeeper.start() {
|
||||
eprintln!("safekeeper start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("stop", Some(sub_match)) => {
|
||||
let node_name = sub_match
|
||||
.value_of("node")
|
||||
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
let immediate = sub_match.value_of("stop-mode") == Some("immediate");
|
||||
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
"stop" => {
|
||||
let immediate = sub_args.value_of("stop-mode") == Some("immediate");
|
||||
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper stop failed: {}", e);
|
||||
@@ -663,15 +629,10 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
}
|
||||
}
|
||||
|
||||
("restart", Some(sub_match)) => {
|
||||
let node_name = sub_match
|
||||
.value_of("node")
|
||||
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
"restart" => {
|
||||
let immediate = sub_args.value_of("stop-mode") == Some("immediate");
|
||||
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
|
||||
//TODO what shutdown strategy should we use here?
|
||||
if let Err(e) = safekeeper.stop(false) {
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
@@ -682,7 +643,9 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
_ => {
|
||||
bail!("Unexpected safekeeper subcommand '{}'", sub_name)
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user