Put a global limit on memory used by in-memory layers.

Adds simple global tracking of memory used by the in-memory layers. It's
very approximate, it doesn't take into account allocator, memory
fragmentation or many other things, but it's a good first step.

After storing a WAL record in the repository, the WAL receiver checks
if the global memory usage. If it's above a configurable threshold (hard
coded at 128 MB at the moment), it evicts a layer. The victim layer is
chosen by GClock algorithm, similar to that used in the Postgres buffer
cache.

This stops the page server from using an unbounded amount of memory. It's
pretty crude, the eviction and materializing and writing a layer to disk
happens now in the WAL receiver thread. It would be nice to move that
to a background thread, and it would be nice to have a smarter policy on
when to materialize a new image layer and when to just write out a delta
layer, and it would be nice to have more accurate accounting of memory.
But this should fix the most pressing OOM issues, and is a step in the
right direction.

Co-authored-by: Patrick Insinger <patrickinsinger@gmail.com>
This commit is contained in:
Heikki Linnakangas
2021-11-02 15:49:39 +02:00
parent 8c6d2664c0
commit fb524dd973
10 changed files with 276 additions and 44 deletions

View File

@@ -43,6 +43,7 @@ struct CfgFileParams {
checkpoint_period: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
open_mem_limit: Option<String>,
pg_distrib_dir: Option<String>,
auth_validation_public_key_path: Option<String>,
auth_type: Option<String>,
@@ -104,6 +105,7 @@ impl CfgFileParams {
checkpoint_period: get_arg("checkpoint_period"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
open_mem_limit: get_arg("open_mem_limit"),
pg_distrib_dir: get_arg("postgres-distrib"),
auth_validation_public_key_path: get_arg("auth-validation-public-key-path"),
auth_type: get_arg("auth-type"),
@@ -122,6 +124,7 @@ impl CfgFileParams {
checkpoint_period: self.checkpoint_period.or(other.checkpoint_period),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
open_mem_limit: self.open_mem_limit.or(other.open_mem_limit),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
auth_validation_public_key_path: self
.auth_validation_public_key_path
@@ -166,6 +169,11 @@ impl CfgFileParams {
None => DEFAULT_GC_PERIOD,
};
let open_mem_limit: usize = match self.open_mem_limit.as_ref() {
Some(open_mem_limit_str) => open_mem_limit_str.parse()?,
None => DEFAULT_OPEN_MEM_LIMIT,
};
let pg_distrib_dir = match self.pg_distrib_dir.as_ref() {
Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str),
None => env::current_dir()?.join("tmp_install"),
@@ -237,6 +245,7 @@ impl CfgFileParams {
checkpoint_period,
gc_horizon,
gc_period,
open_mem_limit,
superuser: String::from(DEFAULT_SUPERUSER),
@@ -307,6 +316,12 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between garbage collector iterations"),
)
.arg(
Arg::with_name("open_mem_limit")
.long("open_mem_limit")
.takes_value(true)
.help("Amount of memory reserved for buffering incoming WAL"),
)
.arg(
Arg::with_name("workdir")
.short("D")
@@ -601,6 +616,7 @@ mod tests {
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
open_mem_limit: Some("open_mem_limit_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
auth_validation_public_key_path: Some(
"auth_validation_public_key_path_VALUE".to_string(),
@@ -624,6 +640,7 @@ checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
open_mem_limit = 'open_mem_limit_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'
auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE'
auth_type = 'auth_type_VALUE'
@@ -658,6 +675,7 @@ local_path = 'relish_storage_local_VALUE'
checkpoint_period: Some("checkpoint_period_VALUE".to_string()),
gc_horizon: Some("gc_horizon_VALUE".to_string()),
gc_period: Some("gc_period_VALUE".to_string()),
open_mem_limit: Some("open_mem_limit_VALUE".to_string()),
pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()),
auth_validation_public_key_path: Some(
"auth_validation_public_key_path_VALUE".to_string(),
@@ -684,6 +702,7 @@ checkpoint_distance = 'checkpoint_distance_VALUE'
checkpoint_period = 'checkpoint_period_VALUE'
gc_horizon = 'gc_horizon_VALUE'
gc_period = 'gc_period_VALUE'
open_mem_limit = 'open_mem_limit_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'
auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE'
auth_type = 'auth_type_VALUE'

View File

@@ -65,6 +65,7 @@ mod storage_layer;
use delta_layer::DeltaLayer;
use image_layer::ImageLayer;
use global_layer_map::{LayerId, GLOBAL_LAYER_MAP};
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
use storage_layer::{
@@ -799,6 +800,10 @@ impl Timeline for LayeredTimeline {
_write_guard: self.write_lock.lock().unwrap(),
})
}
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline {
self
}
}
impl LayeredTimeline {
@@ -1140,9 +1145,8 @@ impl LayeredTimeline {
// a lot of memory and/or aren't receiving much updates anymore.
let mut disk_consistent_lsn = last_record_lsn;
let mut created_historics = false;
let mut layer_uploads = Vec::new();
while let Some((oldest_slot_id, oldest_layer, oldest_generation)) =
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) =
layers.peek_oldest_open()
{
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
@@ -1169,42 +1173,14 @@ impl LayeredTimeline {
break;
}
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
oldest_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.remove_open(oldest_slot_id);
layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self)?;
let mut this_layer_uploads = self.evict_layer(oldest_layer_id)?;
layer_uploads.append(&mut this_layer_uploads);
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
created_historics = true;
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(oldest_layer);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
}
// Call unload() on all frozen layers, to release memory.
@@ -1217,7 +1193,7 @@ impl LayeredTimeline {
drop(layers);
drop(write_guard);
if created_historics {
if !layer_uploads.is_empty() {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
let timeline_dir =
@@ -1270,6 +1246,55 @@ impl LayeredTimeline {
Ok(())
}
fn evict_layer(&self, layer_id: LayerId) -> Result<Vec<PathBuf>> {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
let mut layer_uploads = Vec::new();
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
drop(global_layer_map);
oldest_layer.freeze(self.get_last_record_lsn());
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.remove_open(layer_id);
layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self)?;
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(oldest_layer);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_uploads.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_uploads.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
}
drop(layers);
drop(write_guard);
Ok(layer_uploads)
}
///
/// Garbage collect layer files on a timeline that are no longer needed.
///
@@ -1837,3 +1862,32 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
path
))
}
//----- Global layer management
/// Check if too much memory is being used by open layers. If so, evict
pub fn evict_layer_if_needed(conf: &PageServerConf) -> Result<()> {
// Keep evicting layers until we are below the memory threshold.
let mut global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
while let Some((layer_id, layer)) = global_layer_map.find_victim_if_needed(conf.open_mem_limit)
{
drop(global_layer_map);
let tenantid = layer.get_tenant_id();
let timelineid = layer.get_timeline_id();
let _entered =
info_span!("global evict", timeline = %timelineid, tenant = %tenantid).entered();
info!("evicting {}", layer.filename().display());
drop(layer);
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
timeline
.upgrade_to_layered_timeline()
.evict_layer(layer_id)?;
global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
}
Ok(())
}

View File

@@ -148,6 +148,10 @@ pub struct DeltaLayerInner {
}
impl Layer for DeltaLayer {
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}

View File

@@ -2,22 +2,36 @@
//! Global registry of open layers.
//!
//! Whenever a new in-memory layer is created to hold incoming WAL, it is registered
//! in [`GLOBAL_LAYER_MAP`], so that we can keep track of the total number of in-memory layers
//! in the system, and know when we need to evict some to release memory.
//! in [`GLOBAL_LAYER_MAP`], so that we can keep track of the total number of
//! in-memory layers in the system, and know when we need to evict some to release
//! memory.
//!
//! Each layer is assigned a unique ID when it's registered in the global registry.
//! The ID can be used to relocate the layer later, without having to hold locks.
//!
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use super::inmemory_layer::InMemoryLayer;
use lazy_static::lazy_static;
const MAX_USAGE_COUNT: u8 = 5;
lazy_static! {
pub static ref GLOBAL_LAYER_MAP: RwLock<OpenLayers> = RwLock::new(OpenLayers::default());
}
///
/// How much memory is being used by all the open layers? This is used to trigger
/// freezing and evicting an open layer to disk.
///
/// This is only a rough approximation, it leaves out a lot of things like malloc()
/// overhead. But as long there is enough "slop" and it's not set too close to the RAM
/// size on the system, it's good enough.
pub static GLOBAL_OPEN_MEM_USAGE: AtomicUsize = AtomicUsize::new(0);
// TODO these types can probably be smaller
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct LayerId {
@@ -35,11 +49,14 @@ enum SlotData {
struct Slot {
tag: u64,
data: SlotData,
usage_count: AtomicU8, // for clock algorithm
}
#[derive(Default)]
pub struct OpenLayers {
slots: Vec<Slot>,
num_occupied: usize,
next_victim: AtomicUsize,
// Head of free-slot list.
next_empty_slot_idx: Option<usize>,
@@ -54,21 +71,29 @@ impl OpenLayers {
self.slots.push(Slot {
tag: 0,
data: SlotData::Vacant(None),
usage_count: AtomicU8::new(0),
});
idx
}
};
let slots_len = self.slots.len();
let slot = &mut self.slots[slot_idx];
match slot.data {
SlotData::Occupied(_) => unimplemented!(),
SlotData::Occupied(_) => {
panic!("an occupied slot was in the free list");
}
SlotData::Vacant(next_empty_slot_idx) => {
self.next_empty_slot_idx = next_empty_slot_idx;
}
}
slot.data = SlotData::Occupied(layer);
slot.usage_count.store(1, Ordering::Relaxed);
self.num_occupied += 1;
assert!(self.num_occupied <= slots_len);
LayerId {
index: slot_idx,
@@ -83,12 +108,81 @@ impl OpenLayers {
}
if let SlotData::Occupied(layer) = &slot.data {
let _ = slot.usage_count.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|old_usage_count| {
if old_usage_count < MAX_USAGE_COUNT {
Some(old_usage_count + 1)
} else {
None
}
},
);
Some(Arc::clone(layer))
} else {
None
}
}
/// Find a victim layer to evict, if the total memory usage of all open layers
/// is larger than 'limit'
pub fn find_victim_if_needed(&self, limit: usize) -> Option<(LayerId, Arc<InMemoryLayer>)> {
let mem_usage = GLOBAL_OPEN_MEM_USAGE.load(Ordering::Relaxed);
if mem_usage > limit {
self.find_victim()
} else {
None
}
}
pub fn find_victim(&self) -> Option<(LayerId, Arc<InMemoryLayer>)> {
if self.num_occupied == 0 {
return None;
}
// Run the clock algorithm.
//
// FIXME: It's theoretically possible that a constant stream of get() requests
// comes in faster than we advance the clock hand, so that this never finishes.
loop {
// FIXME: Because we interpret the clock hand variable modulo slots.len(), the
// hand effectively jumps to a more or less random place whenever the array is
// expanded. That's relatively harmless, it just leads to a non-optimal choice
// of victim. Also, in a server that runs for long enough, the array should reach
// a steady-state size and not grow anymore.
let next_victim = self.next_victim.fetch_add(1, Ordering::Relaxed) % self.slots.len();
let slot = &self.slots[next_victim];
if let SlotData::Occupied(data) = &slot.data {
fn update_fn(old_usage_count: u8) -> Option<u8> {
if old_usage_count > 0 {
Some(old_usage_count - 1)
} else {
None
}
}
if slot
.usage_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, update_fn)
.is_err()
{
// Found a slot with usage_count == 0. Return it.
return Some((
LayerId {
index: next_victim,
tag: slot.tag,
},
Arc::clone(data),
));
}
}
}
}
// TODO this won't be a public API in the future
pub fn remove(&mut self, layer_id: &LayerId) {
let slot = &mut self.slots[layer_id.index];
@@ -107,6 +201,9 @@ impl OpenLayers {
slot.data = SlotData::Vacant(self.next_empty_slot_idx);
self.next_empty_slot_idx = Some(layer_id.index);
assert!(self.num_occupied > 0);
self.num_occupied -= 1;
slot.tag = slot.tag.wrapping_add(1);
}
}

View File

@@ -117,6 +117,10 @@ impl Layer for ImageLayer {
PathBuf::from(self.layer_name().to_string())
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}

View File

@@ -3,6 +3,7 @@
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
//!
use crate::layered_repository::filename::DeltaFileName;
use crate::layered_repository::global_layer_map::GLOBAL_OPEN_MEM_USAGE;
use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
};
@@ -16,6 +17,7 @@ use anyhow::{bail, ensure, Result};
use bytes::Bytes;
use log::*;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use zenith_utils::vec_map::VecMap;
@@ -69,6 +71,15 @@ pub struct InMemoryLayerInner {
/// a non-blocky rel, 'segsizes' is not used and is always empty.
///
segsizes: VecMap<Lsn, u32>,
/// Approximate amount of memory used by this layer.
///
/// TODO: This is currently a very crude metric, we don't take into account allocator
/// overhead, memory fragmentation, memory used by the VecMaps, nor many other things.
/// Just the actual # of bytes of a page image (8 kB) or the size of a WAL record.
///
/// Whenever this is changed, you must also modify GLOBAL_OPEN_MEM_USAGE accordingly!
mem_usage: usize,
}
impl InMemoryLayerInner {
@@ -89,6 +100,15 @@ impl InMemoryLayerInner {
}
}
impl Drop for InMemoryLayerInner {
fn drop(&mut self) {
if self.mem_usage > 0 {
GLOBAL_OPEN_MEM_USAGE.fetch_sub(self.mem_usage, Ordering::Relaxed);
self.mem_usage = 0;
}
}
}
impl Layer for InMemoryLayer {
// An in-memory layer doesn't really have a filename as it's not stored on disk,
// but we construct a filename as if it was a delta layer
@@ -113,6 +133,10 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename))
}
fn get_tenant_id(&self) -> ZTenantId {
self.tenantid
}
fn get_timeline_id(&self) -> ZTimelineId {
self.timelineid
}
@@ -281,12 +305,6 @@ pub struct LayersOnDisk {
pub image_layers: Vec<ImageLayer>,
}
impl LayersOnDisk {
pub fn is_empty(&self) -> bool {
self.delta_layers.is_empty() && self.image_layers.is_empty()
}
}
impl InMemoryLayer {
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
@@ -330,6 +348,7 @@ impl InMemoryLayer {
dropped: false,
page_versions: PageVersions::default(),
segsizes,
mem_usage: 0,
}),
})
}
@@ -376,6 +395,13 @@ impl InMemoryLayer {
inner.assert_writeable();
let mut mem_usage = 0;
if let Some(img) = &pv.page_image {
mem_usage += img.len();
} else if let Some(rec) = &pv.record {
mem_usage += rec.rec.len();
}
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
if old.is_some() {
@@ -386,6 +412,9 @@ impl InMemoryLayer {
);
}
inner.mem_usage += mem_usage;
GLOBAL_OPEN_MEM_USAGE.fetch_add(mem_usage, Ordering::Relaxed);
// Also update the relation size, if this extended the relation.
if self.seg.rel.is_blocky() {
let newsize = blknum - self.seg.segno * RELISH_SEG_SIZE + 1;
@@ -521,6 +550,7 @@ impl InMemoryLayer {
dropped: false,
page_versions: PageVersions::default(),
segsizes,
mem_usage: 0,
}),
})
}
@@ -550,6 +580,16 @@ impl InMemoryLayer {
for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) {
assert!(lsn <= end_lsn);
}
// It's a bit premature to subtract the global mem usage here already.
// This layer consumes memory until it's written out to disk and dropped.
// But GLOBAL_OPEN_MEM_USAGE is used to trigger layer eviction, if there are
// too many open layers, and from that point of view this should no longer be
// counted against the global mem usage.
if inner.mem_usage > 0 {
GLOBAL_OPEN_MEM_USAGE.fetch_sub(inner.mem_usage, Ordering::Relaxed);
inner.mem_usage = 0;
}
}
}

View File

@@ -4,7 +4,7 @@
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::ZTimelineId;
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -104,6 +104,8 @@ pub enum PageReconstructResult {
/// in-memory and on-disk layers.
///
pub trait Layer: Send + Sync {
fn get_tenant_id(&self) -> ZTenantId;
/// Identify the timeline this relish belongs to
fn get_timeline_id(&self) -> ZTimelineId;

View File

@@ -43,6 +43,8 @@ pub mod defaults {
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
pub const DEFAULT_OPEN_MEM_LIMIT: usize = 128 * 1024 * 1024;
}
lazy_static! {
@@ -71,6 +73,8 @@ pub struct PageServerConf {
pub gc_period: Duration,
pub superuser: String,
pub open_mem_limit: usize,
// Repository directory, relative to current working directory.
// Normally, the page server changes the current working directory
// to the repository, and 'workdir' is always '.'. But we don't do
@@ -153,6 +157,7 @@ impl PageServerConf {
checkpoint_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
open_mem_limit: defaults::DEFAULT_OPEN_MEM_LIMIT,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
superuser: "zenith_admin".to_string(),

View File

@@ -160,6 +160,9 @@ pub trait Timeline: Send + Sync {
/// Does the same as get_current_logical_size but counted on demand.
/// Used in tests to ensure thet incremental and non incremental variants match.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
/// An escape hatch to allow "casting" a generic Timeline to LayeredTimeline.
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline;
}
/// Various functions to mutate the timeline.

View File

@@ -5,6 +5,7 @@
//!
//! We keep one WAL receiver active per timeline.
use crate::layered_repository;
use crate::relish::*;
use crate::restore_local_repo;
use crate::tenant_mgr;
@@ -175,7 +176,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
}
fn walreceiver_main(
_conf: &PageServerConf,
conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
@@ -295,6 +296,9 @@ fn walreceiver_main(
caught_up = true;
}
// Release memory if needed
layered_repository::evict_layer_if_needed(conf)?;
Some(endlsn)
}