WIP: Use poll()

This commit is contained in:
Heikki Linnakangas
2021-10-28 16:23:51 +03:00
parent 23713eb44f
commit 7cf7215ce2
7 changed files with 378 additions and 237 deletions

1
Cargo.lock generated
View File

@@ -1193,6 +1193,7 @@ dependencies = [
"hyper",
"lazy_static",
"log",
"nix",
"postgres",
"postgres-protocol",
"postgres-types",

View File

@@ -37,6 +37,7 @@ async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
nix = "0.23"
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -66,7 +66,7 @@ use delta_layer::DeltaLayer;
use image_layer::ImageLayer;
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
use layer_map::{LayerId, LayerMap};
use storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
};
@@ -1278,7 +1278,7 @@ impl LayeredTimeline {
let mut created_historics = false;
let mut layer_uploads = Vec::new();
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
if tenant_mgr::shutdown_requested() && !forced {
@@ -1315,8 +1315,8 @@ impl LayeredTimeline {
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.pop_oldest_open();
layers.insert_historic(oldest_layer.clone());
layers.remove(oldest_layer_id);
let oldest_layer_id = layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
@@ -1332,7 +1332,7 @@ impl LayeredTimeline {
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(oldest_layer);
layers.remove(oldest_layer_id);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
@@ -1348,7 +1348,7 @@ impl LayeredTimeline {
// Call unload() on all frozen layers, to release memory.
// This shouldn't be much memory, as only metadata is slurped
// into memory.
for layer in layers.iter_historic_layers() {
for (_layer_id, layer) in layers.iter_historic_layers() {
layer.unload()?;
}
@@ -1440,7 +1440,7 @@ impl LayeredTimeline {
debug!("retain_lsns: {:?}", retain_lsns);
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
let mut layers_to_remove: Vec<LayerId> = Vec::new();
// Scan all on-disk layers in the timeline.
//
@@ -1451,7 +1451,7 @@ impl LayeredTimeline {
// 4. this layer doesn't serve as a tombstone for some older layer;
//
let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
'outer: for (layer_id, l) in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
if seg.rel.is_relation() {
@@ -1593,16 +1593,16 @@ impl LayeredTimeline {
l.get_end_lsn(),
l.is_dropped()
);
layers_to_remove.push(Arc::clone(&l));
layers_to_remove.push(layer_id);
}
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
for doomed_layer in layers_to_remove {
for doomed_layer_id in layers_to_remove {
let doomed_layer = layers.get_with_id(doomed_layer_id);
doomed_layer.delete()?;
layers.remove_historic(doomed_layer.clone());
layers.remove(doomed_layer_id);
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),

View File

@@ -41,23 +41,22 @@
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
pub struct IntervalTree<I: ?Sized>
pub struct IntervalTree<I>
where
I: IntervalItem,
{
points: BTreeMap<I::Key, Point<I>>,
}
struct Point<I: ?Sized> {
struct Point<I> {
/// All intervals that contain this point, in no particular order.
///
/// We assume that there aren't a lot of overlappingg intervals, so that this vector
/// never grows very large. If that assumption doesn't hold, we could keep this ordered
/// by the end bound, to speed up `search`. But as long as there are only a few elements,
/// a linear search is OK.
elements: Vec<Arc<I>>,
elements: Vec<I>,
}
/// Abstraction for an interval that can be stored in the tree
@@ -75,14 +74,14 @@ pub trait IntervalItem {
}
}
impl<I: ?Sized> IntervalTree<I>
impl<I> IntervalTree<I>
where
I: IntervalItem,
I: IntervalItem + PartialEq + Clone,
{
/// Return an element that contains 'key', or precedes it.
///
/// If there are multiple candidates, returns the one with the highest 'end' key.
pub fn search(&self, key: I::Key) -> Option<Arc<I>> {
pub fn search(&self, key: I::Key) -> Option<&I> {
// Find the greatest point that precedes or is equal to the search key. If there is
// none, returns None.
let (_, p) = self.points.range(..=key).next_back()?;
@@ -100,7 +99,7 @@ where
}
})
.unwrap();
Some(Arc::clone(highest_item))
Some(highest_item)
}
/// Iterate over all items with start bound >= 'key'
@@ -119,7 +118,7 @@ where
}
}
pub fn insert(&mut self, item: Arc<I>) {
pub fn insert(&mut self, item: &I) {
let start_key = item.start_key();
let end_key = item.end_key();
assert!(start_key < end_key);
@@ -133,18 +132,18 @@ where
found_start_point = true;
// It is an error to insert the same item to the tree twice.
assert!(
!point.elements.iter().any(|x| Arc::ptr_eq(x, &item)),
!point.elements.iter().any(|x| x == item),
"interval is already in the tree"
);
}
point.elements.push(Arc::clone(&item));
point.elements.push(item.clone());
}
if !found_start_point {
// Create a new Point for the starting point
// Look at the previous point, and copy over elements that overlap with this
// new point
let mut new_elements: Vec<Arc<I>> = Vec::new();
let mut new_elements: Vec<I> = Vec::new();
if let Some((_, prev_point)) = self.points.range(..start_key).next_back() {
let overlapping_prev_elements = prev_point
.elements
@@ -154,7 +153,7 @@ where
new_elements.extend(overlapping_prev_elements);
}
new_elements.push(item);
new_elements.push(item.clone());
let new_point = Point {
elements: new_elements,
@@ -163,7 +162,7 @@ where
}
}
pub fn remove(&mut self, item: &Arc<I>) {
pub fn remove(&mut self, item: &I) {
// range search points
let start_key = item.start_key();
let end_key = item.end_key();
@@ -176,7 +175,7 @@ where
found_start_point = true;
}
let len_before = point.elements.len();
point.elements.retain(|other| !Arc::ptr_eq(other, item));
point.elements.retain(|other| other != item);
let len_after = point.elements.len();
assert_eq!(len_after + 1, len_before);
if len_after == 0 {
@@ -191,19 +190,19 @@ where
}
}
pub struct IntervalIter<'a, I: ?Sized>
pub struct IntervalIter<'a, I>
where
I: IntervalItem,
{
point_iter: std::collections::btree_map::Range<'a, I::Key, Point<I>>,
elem_iter: Option<(I::Key, std::slice::Iter<'a, Arc<I>>)>,
elem_iter: Option<(I::Key, std::slice::Iter<'a, I>)>,
}
impl<'a, I> Iterator for IntervalIter<'a, I>
where
I: IntervalItem + ?Sized,
I: IntervalItem,
{
type Item = Arc<I>;
type Item = &'a I;
fn next(&mut self) -> Option<Self::Item> {
// Iterate over all elements in all the points in 'point_iter'. To avoid
@@ -214,7 +213,7 @@ where
if let Some((point_key, elem_iter)) = &mut self.elem_iter {
for elem in elem_iter {
if elem.start_key() == *point_key {
return Some(Arc::clone(elem));
return Some(elem);
}
}
}
@@ -230,7 +229,7 @@ where
}
}
impl<I: ?Sized> Default for IntervalTree<I>
impl<I> Default for IntervalTree<I>
where
I: IntervalItem,
{
@@ -246,7 +245,7 @@ mod tests {
use super::*;
use std::fmt;
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
struct MockItem {
start_key: u32,
end_key: u32,
@@ -288,7 +287,7 @@ mod tests {
tree: &IntervalTree<MockItem>,
key: u32,
expected: &[&str],
) -> Option<Arc<MockItem>> {
) -> Option<MockItem> {
if let Some(v) = tree.search(key) {
let vstr = v.to_string();
@@ -299,7 +298,7 @@ mod tests {
key, v, expected,
);
Some(v)
Some(v.clone())
} else {
assert!(
expected.is_empty(),
@@ -331,12 +330,12 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Simple, non-overlapping ranges.
tree.insert(Arc::new(MockItem::new(10, 11)));
tree.insert(Arc::new(MockItem::new(11, 12)));
tree.insert(Arc::new(MockItem::new(12, 13)));
tree.insert(Arc::new(MockItem::new(18, 19)));
tree.insert(Arc::new(MockItem::new(17, 18)));
tree.insert(Arc::new(MockItem::new(15, 16)));
tree.insert(&MockItem::new(10, 11));
tree.insert(&MockItem::new(11, 12));
tree.insert(&MockItem::new(12, 13));
tree.insert(&MockItem::new(18, 19));
tree.insert(&MockItem::new(17, 18));
tree.insert(&MockItem::new(15, 16));
assert_search(&tree, 9, &[]);
assert_search(&tree, 10, &["10-11"]);
@@ -370,13 +369,13 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Overlapping items
tree.insert(Arc::new(MockItem::new(22, 24)));
tree.insert(Arc::new(MockItem::new(23, 25)));
let x24_26 = Arc::new(MockItem::new(24, 26));
tree.insert(Arc::clone(&x24_26));
let x26_28 = Arc::new(MockItem::new(26, 28));
tree.insert(Arc::clone(&x26_28));
tree.insert(Arc::new(MockItem::new(25, 27)));
tree.insert(&MockItem::new(22, 24));
tree.insert(&MockItem::new(23, 25));
let x24_26 = MockItem::new(24, 26);
tree.insert(&x24_26);
let x26_28 = MockItem::new(26, 28);
tree.insert(&x26_28);
tree.insert(&MockItem::new(25, 27));
assert_search(&tree, 22, &["22-24"]);
assert_search(&tree, 23, &["22-24", "23-25"]);
@@ -403,10 +402,10 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Items containing other items
tree.insert(Arc::new(MockItem::new(31, 39)));
tree.insert(Arc::new(MockItem::new(32, 34)));
tree.insert(Arc::new(MockItem::new(33, 35)));
tree.insert(Arc::new(MockItem::new(30, 40)));
tree.insert(&MockItem::new(31, 39));
tree.insert(&MockItem::new(32, 34));
tree.insert(&MockItem::new(33, 35));
tree.insert(&MockItem::new(30, 40));
assert_search(&tree, 30, &["30-40"]);
assert_search(&tree, 31, &["30-40", "31-39"]);
@@ -427,16 +426,16 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Duplicate keys
let item_a = Arc::new(MockItem::new_str(55, 56, "a"));
tree.insert(Arc::clone(&item_a));
let item_b = Arc::new(MockItem::new_str(55, 56, "b"));
tree.insert(Arc::clone(&item_b));
let item_c = Arc::new(MockItem::new_str(55, 56, "c"));
tree.insert(Arc::clone(&item_c));
let item_d = Arc::new(MockItem::new_str(54, 56, "d"));
tree.insert(Arc::clone(&item_d));
let item_e = Arc::new(MockItem::new_str(55, 57, "e"));
tree.insert(Arc::clone(&item_e));
let item_a = MockItem::new_str(55, 56, "a");
tree.insert(&item_a);
let item_b = MockItem::new_str(55, 56, "b");
tree.insert(&item_b);
let item_c = MockItem::new_str(55, 56, "c");
tree.insert(&item_c);
let item_d = MockItem::new_str(54, 56, "d");
tree.insert(&item_d);
let item_e = MockItem::new_str(55, 57, "e");
tree.insert(&item_e);
dump_tree(&tree);
@@ -461,8 +460,8 @@ mod tests {
let mut tree: IntervalTree<MockItem> = IntervalTree::default();
// Inserting the same item twice is not cool
let item = Arc::new(MockItem::new(1, 2));
tree.insert(Arc::clone(&item));
tree.insert(Arc::clone(&item)); // fails assertion
let item = MockItem::new(1, 2);
tree.insert(&item);
tree.insert(&item); // fails assertion
}
}

View File

@@ -9,6 +9,22 @@
//! new image and delta layers and corresponding files are written to disk.
//!
//
// Global layer registry:
//
// Every layer is inserted into the global registry, and assigned an ID
//
// The global registry tracks memory usage and usage count for each layer
//
//
// In addition to that, there is a per-timeline LayerMap, used for lookups
//
//
use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree};
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::layered_repository::InMemoryLayer;
@@ -17,7 +33,7 @@ use anyhow::Result;
use lazy_static::lazy_static;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use zenith_metrics::{register_int_gauge, IntGauge};
use zenith_utils::lsn::Lsn;
@@ -28,8 +44,92 @@ lazy_static! {
static ref NUM_ONDISK_LAYERS: IntGauge =
register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk")
.expect("failed to define a metric");
// Global layer map
static ref LAYERS: Mutex<GlobalLayerMap> = Mutex::new(GlobalLayerMap::new());
}
const MAX_LOADED_LAYERS: usize = 10;
#[derive(Clone)]
enum GlobalLayerEntry {
InMemory(Arc<InMemoryLayer>),
Historic(Arc<dyn Layer>),
}
struct GlobalLayerMap {
layers: HashMap<LayerId, GlobalLayerEntry>,
last_id: u64,
// Layers currently loaded. We run a clock algorithm across these.
loaded_layers: Vec<LayerId>,
}
impl GlobalLayerMap {
pub fn new() -> GlobalLayerMap {
GlobalLayerMap {
layers: HashMap::new(),
last_id: 0,
loaded_layers: Vec::new(),
}
}
pub fn get(&mut self, layer_id: LayerId) -> Arc<dyn Layer> {
match self.layers.get(&layer_id) {
Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(),
Some(GlobalLayerEntry::Historic(layer)) => layer.clone(),
None => panic!()
}
}
pub fn get_open(&mut self, layer_id: LayerId) -> Arc<InMemoryLayer> {
match self.layers.get(&layer_id) {
Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(),
Some(GlobalLayerEntry::Historic(_layer)) => panic!(),
None => panic!()
}
}
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
let layer_id = LayerId(self.last_id);
self.last_id += 1;
self.layers.insert(layer_id, GlobalLayerEntry::InMemory(layer));
layer_id
}
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
let layer_id = LayerId(self.last_id);
self.last_id += 1;
self.layers.insert(layer_id, GlobalLayerEntry::Historic(layer));
layer_id
}
pub fn remove(&mut self, layer_id: LayerId) -> GlobalLayerEntry {
if let Some(entry) = self.layers.remove(&layer_id) {
let orig_entry = entry.clone();
match orig_entry {
GlobalLayerEntry::InMemory(_layer) => {
NUM_INMEMORY_LAYERS.dec();
},
GlobalLayerEntry::Historic(_layer) => {
NUM_ONDISK_LAYERS.dec();
}
}
entry.clone()
} else {
panic!()
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct LayerId(u64);
///
/// LayerMap tracks what layers exist on a timeline.
///
@@ -41,7 +141,7 @@ pub struct LayerMap {
/// All in-memory layers, ordered by 'oldest_pending_lsn' and generation
/// of each layer. This allows easy access to the in-memory layer that
/// contains the oldest WAL record.
open_layers: BinaryHeap<OpenLayerEntry>,
open_layers: BinaryHeap<OpenLayerHeapEntry>,
/// Generation number, used to distinguish newly inserted entries in the
/// binary heap from older entries during checkpoint.
@@ -61,6 +161,11 @@ impl LayerMap {
segentry.get(lsn)
}
pub fn get_with_id(&self, layer_id: LayerId) -> Arc<dyn Layer> {
// TODO: check that it belongs to this tenant+timeline
LAYERS.lock().unwrap().get(layer_id)
}
///
/// Get the open layer for given segment for writing. Or None if no open
/// layer exists.
@@ -68,16 +173,24 @@ impl LayerMap {
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
let segentry = self.segs.get(tag)?;
segentry.open.as_ref().map(Arc::clone)
if let Some((layer_id, _start_lsn)) = segentry.open {
Some(LAYERS.lock().unwrap().get_open(layer_id))
} else {
None
}
}
///
/// Insert an open in-memory layer
///
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
let layer_id = LAYERS.lock().unwrap().insert_open(Arc::clone(&layer));
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.update_open(Arc::clone(&layer));
segentry.update_open(layer_id, layer.get_start_lsn());
let oldest_pending_lsn = layer.get_oldest_pending_lsn();
@@ -87,9 +200,9 @@ impl LayerMap {
assert!(oldest_pending_lsn.is_aligned());
// Also add it to the binary heap
let open_layer_entry = OpenLayerEntry {
let open_layer_entry = OpenLayerHeapEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer,
layer_id,
generation: self.current_generation,
};
self.open_layers.push(open_layer_entry);
@@ -98,47 +211,46 @@ impl LayerMap {
}
/// Remove the oldest in-memory layer
pub fn pop_oldest_open(&mut self) {
// Pop it from the binary heap
let oldest_entry = self.open_layers.pop().unwrap();
let segtag = oldest_entry.layer.get_seg_tag();
pub fn remove(&mut self, layer_id: LayerId) {
let layer_entry = LAYERS.lock().unwrap().remove(layer_id);
// Also remove it from the SegEntry of this segment
let mut segentry = self.segs.get_mut(&segtag).unwrap();
if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) {
segentry.open = None;
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
assert!(!oldest_entry.layer.is_writeable());
assert!(oldest_entry.layer.is_dropped());
}
match layer_entry {
GlobalLayerEntry::InMemory(layer) => {
let tag = layer.get_seg_tag();
NUM_INMEMORY_LAYERS.dec();
if let Some(segentry) = self.segs.get_mut(&tag) {
segentry.historic.remove(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
}
}
GlobalLayerEntry::Historic(layer) => {
let segtag = layer.get_seg_tag();
let mut segentry = self.segs.get_mut(&segtag).unwrap();
if let Some(open) = segentry.open {
if open.0 == layer_id {
segentry.open = None;
}
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
//assert!(!layer.is_writeable());
//assert!(layer.is_dropped());
}
}
}
}
///
/// Insert an on-disk layer
///
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) -> LayerId {
let layer_id = LAYERS.lock().unwrap().insert_historic(Arc::clone(&layer));
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.insert_historic(layer);
segentry.insert_historic(layer_id, layer);
NUM_ONDISK_LAYERS.inc();
}
///
/// Remove an on-disk layer from the map.
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
let tag = layer.get_seg_tag();
if let Some(segentry) = self.segs.get_mut(&tag) {
segentry.historic.remove(&layer);
}
NUM_ONDISK_LAYERS.dec();
layer_id
}
// List relations along with a flag that marks if they exist at the given lsn.
@@ -199,10 +311,15 @@ impl LayerMap {
}
/// Return the oldest in-memory layer, along with its generation number.
pub fn peek_oldest_open(&self) -> Option<(Arc<InMemoryLayer>, u64)> {
self.open_layers
.peek()
.map(|oldest_entry| (Arc::clone(&oldest_entry.layer), oldest_entry.generation))
pub fn peek_oldest_open(&self) -> Option<(LayerId, Arc<InMemoryLayer>, u64)> {
if let Some(oldest_entry) = self.open_layers.peek() {
Some((oldest_entry.layer_id,
LAYERS.lock().unwrap().get_open(oldest_entry.layer_id),
oldest_entry.generation))
} else {
None
}
}
/// Increment the generation number used to stamp open in-memory layers. Layers
@@ -220,6 +337,7 @@ impl LayerMap {
}
}
/*
/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub fn dump(&self) -> Result<()> {
@@ -236,16 +354,39 @@ impl LayerMap {
println!("End dump LayerMap");
Ok(())
}
*/
}
impl IntervalItem for dyn Layer {
#[derive(Clone)]
struct HistoricLayerIntervalTreeEntry {
layer_id: LayerId,
start_lsn: Lsn,
end_lsn: Lsn,
}
impl HistoricLayerIntervalTreeEntry {
fn new(layer_id: LayerId, layer: Arc<dyn Layer>) -> HistoricLayerIntervalTreeEntry{
HistoricLayerIntervalTreeEntry {
layer_id,
start_lsn: layer.get_start_lsn(),
end_lsn: layer.get_end_lsn(),
}
}
}
impl PartialEq for HistoricLayerIntervalTreeEntry {
fn eq(&self, other: &Self) -> bool {
self.layer_id == other.layer_id
}
}
impl IntervalItem for HistoricLayerIntervalTreeEntry {
type Key = Lsn;
fn start_key(&self) -> Lsn {
self.get_start_lsn()
self.start_lsn
}
fn end_key(&self) -> Lsn {
self.get_end_lsn()
self.end_lsn
}
}
@@ -259,8 +400,8 @@ impl IntervalItem for dyn Layer {
/// IntervalTree.
#[derive(Default)]
struct SegEntry {
open: Option<Arc<InMemoryLayer>>,
historic: IntervalTree<dyn Layer>,
open: Option<(LayerId, Lsn)>,
historic: IntervalTree<HistoricLayerIntervalTreeEntry>,
}
impl SegEntry {
@@ -276,13 +417,17 @@ impl SegEntry {
pub fn get(&self, lsn: Lsn) -> Option<Arc<dyn Layer>> {
if let Some(open) = &self.open {
if open.get_start_lsn() <= lsn {
let x: Arc<dyn Layer> = Arc::clone(open) as _;
return Some(x);
let layer = LAYERS.lock().unwrap().get(open.0);
if layer.get_start_lsn() <= lsn {
return Some(layer);
}
}
self.historic.search(lsn)
if let Some(historic) = self.historic.search(lsn) {
Some(LAYERS.lock().unwrap().get(historic.layer_id))
} else {
None
}
}
pub fn newer_image_layer_exists(&self, lsn: Lsn) -> bool {
@@ -291,21 +436,25 @@ impl SegEntry {
self.historic
.iter_newer(lsn)
.any(|layer| !layer.is_incremental())
.any(|e| {
let layer = LAYERS.lock().unwrap().get(e.layer_id);
!layer.is_incremental()
}
)
}
// Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer,
// but only if it is not writeable anymore.
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) {
if let Some(prev_open) = &self.open {
assert!(!prev_open.is_writeable());
pub fn update_open(&mut self, layer_id: LayerId, start_lsn: Lsn) {
if let Some(_prev_open) = &self.open {
//assert!(!prev_open.is_writeable());
}
self.open = Some(layer);
self.open = Some((layer_id, start_lsn));
}
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
self.historic.insert(layer);
pub fn insert_historic(&mut self, layer_id: LayerId, layer: Arc<dyn Layer>) {
self.historic.insert(&HistoricLayerIntervalTreeEntry::new(layer_id, layer));
}
}
@@ -315,12 +464,12 @@ impl SegEntry {
/// The generation number associated with each entry can be used to distinguish
/// recently-added entries (i.e after last call to increment_generation()) from older
/// entries with the same 'oldest_pending_lsn'.
struct OpenLayerEntry {
struct OpenLayerHeapEntry {
pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
pub generation: u64,
pub layer: Arc<InMemoryLayer>,
pub layer_id: LayerId,
}
impl Ord for OpenLayerEntry {
impl Ord for OpenLayerHeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that. Entries with identical oldest_pending_lsn are ordered by generation
@@ -330,32 +479,33 @@ impl Ord for OpenLayerEntry {
.then_with(|| other.generation.cmp(&self.generation))
}
}
impl PartialOrd for OpenLayerEntry {
impl PartialOrd for OpenLayerHeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for OpenLayerEntry {
impl PartialEq for OpenLayerHeapEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for OpenLayerEntry {}
impl Eq for OpenLayerHeapEntry {}
/// Iterator returned by LayerMap::iter_historic_layers()
pub struct HistoricLayerIter<'a> {
seg_iter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
iter: Option<IntervalIter<'a, dyn Layer>>,
iter: Option<IntervalIter<'a, HistoricLayerIntervalTreeEntry>>,
}
impl<'a> Iterator for HistoricLayerIter<'a> {
type Item = Arc<dyn Layer>;
type Item = (LayerId, Arc<dyn Layer>);
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
loop {
if let Some(x) = &mut self.iter {
if let Some(x) = x.next() {
return Some(Arc::clone(&x));
let layer = LAYERS.lock().unwrap().get(x.layer_id);
return Some((x.layer_id, layer));
}
}
if let Some((_tag, segentry)) = self.seg_iter.next() {
@@ -426,10 +576,10 @@ mod tests {
// A helper function (closure) to pop the next oldest open entry from the layer map,
// and assert that it is what we'd expect
let mut assert_pop_layer = |expected_segno: u32, expected_generation: u64| {
let (l, generation) = layers.peek_oldest_open().unwrap();
let (layer_id, l, generation) = layers.peek_oldest_open().unwrap();
assert!(l.get_seg_tag().segno == expected_segno);
assert!(generation == expected_generation);
layers.pop_oldest_open();
layers.remove(layer_id);
};
assert_pop_layer(0, gen1); // 0x100

View File

@@ -49,13 +49,10 @@ impl VirtualFile {
if let Some(mut file) = l.files[self.vfd].file.take() {
// return cached File
eprintln!("reusing {} from {}/{}", self.path.display(), self.vfd, self.tag);
file.rewind()?;
return Ok(file);
}
}
eprintln!("opening {}", self.path.display());
File::open(&self.path)
}
@@ -85,8 +82,6 @@ impl VirtualFile {
self.vfd = next;
self.tag = l.files[next].tag;
eprintln!("caching {} at {}/{}", self.path.display(), self.vfd, self.tag);
drop(l);
}
}

View File

@@ -28,20 +28,21 @@ use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::Error;
use std::path::PathBuf;
use std::process::{ChildStdin, ChildStdout, ChildStderr, Command};
use std::process::Stdio;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{ChildStdin, ChildStdout, Command};
use tokio::time::timeout;
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantId;
use std::os::unix::io::AsRawFd;
use nix::poll::*;
use crate::relish::*;
use crate::repository::WALRecord;
use crate::waldecoder::XlMultiXactCreate;
@@ -140,7 +141,6 @@ pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime,
processes: Vec<Mutex<Option<PostgresRedoProcess>>>,
next: AtomicUsize,
}
@@ -218,16 +218,12 @@ impl WalRedoManager for PostgresRedoManager {
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = self
.runtime
.block_on(PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid))?;
let p = PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
result = self
.runtime
.block_on(self.handle_apply_request_postgres(process, &request));
result = self.handle_apply_request_postgres(process, &request);
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
end_time = Instant::now();
@@ -243,14 +239,6 @@ impl PostgresRedoManager {
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut processes: Vec<Mutex<Option<PostgresRedoProcess>>> = Vec::new();
for _ in 1..10 {
processes.push(Mutex::new(None));
@@ -258,7 +246,6 @@ impl PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
runtime,
tenantid,
conf,
processes,
@@ -269,7 +256,7 @@ impl PostgresRedoManager {
///
/// Process one request for WAL redo using wal-redo postgres
///
async fn handle_apply_request_postgres(
fn handle_apply_request_postgres(
&self,
process: &mut PostgresRedoProcess,
request: &WalRedoRequest,
@@ -287,7 +274,7 @@ impl PostgresRedoManager {
if let RelishTag::Relation(rel) = request.rel {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
apply_result = process.apply_wal_records(buf_tag, base_img, records);
let duration = start.elapsed();
@@ -480,13 +467,14 @@ impl PostgresRedoManager {
struct PostgresRedoProcess {
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
}
impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
async fn launch(
fn launch(
conf: &PageServerConf,
process_no: usize,
tenantid: &ZTenantId,
@@ -511,7 +499,6 @@ impl PostgresRedoProcess {
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.output()
.await
.expect("failed to execute initdb");
if !initdb.status.success() {
@@ -548,98 +535,106 @@ impl PostgresRedoProcess {
datadir.display()
);
let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr");
let stdout = child.stdout.take().expect("failed to open child's stdout");
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
}
if res.unwrap() == 0 {
break;
}
error!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())
};
tokio::spawn(f_stderr);
Ok(PostgresRedoProcess { stdin, stdout })
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
Ok(PostgresRedoProcess { stdin, stdout, stderr })
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
async fn apply_wal_records(
fn apply_wal_records(
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> {
let stdout = &mut self.stdout;
// Buffer the writes to avoid a lot of small syscalls.
let mut stdin = tokio::io::BufWriter::new(&mut self.stdin);
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
//
// 'f_stdin' handles writing the base image and WAL records to the child process.
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
// tokio runtime in the 'launch' function already, forwards the logging.
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
let mut buf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut buf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut buf);
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
let mut buf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut buf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut buf);
}
// Send WAL records.
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
build_apply_record_msg(*lsn, &rec.rec, &mut buf);
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
build_get_page_msg(tag, &mut buf);
// The input is now in 'buf'.
let mut nwrite = 0;
let mut resultbuf = Vec::new();
resultbuf.resize(8192, 0);
let mut nresult = 0;
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// Do a blind write first
let n = self.stdin.write(&buf[nwrite..])?;
nwrite += n;
while nresult < 8192 {
let nfds = if nwrite < buf.len() {
3
} else {
2
};
nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
//
// 'f_stdin' handles writing the base image and WAL records to the child process.
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
// tokio runtime in the 'launch' function already, forwards the logging.
if nwrite < buf.len() && !pollfds[2].revents().unwrap().is_empty() {
// stdin
let n = self.stdin.write(&buf[nwrite..])?;
nwrite += n;
}
if !pollfds[0].revents().unwrap().is_empty() {
// stdout
// Read back new page image
let n = self.stdout.read(&mut resultbuf[nresult..])?;
// Send WAL records.
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
build_apply_record_msg(*lsn, &rec.rec, &mut buf);
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
nresult += n;
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
if !pollfds[1].revents().unwrap().is_empty() {
// stderr
let mut readbuf: [u8; 16384] = [0; 16384];
// Send GetPage command to get the result back
build_get_page_msg(tag, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
timeout(TIMEOUT, stdin.flush()).await??;
let n = self.stderr.read(&mut readbuf)?;
error!("wal-redo-postgres: {}", String::from_utf8_lossy(&readbuf[0..n]));
}
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
}
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
let res = tokio::try_join!(f_stdout, f_stdin)?;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
Ok(Bytes::from(Vec::from(resultbuf)))
}
}