diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index aae37a29c8..735eb04973 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -43,6 +43,7 @@ struct CfgFileParams { checkpoint_period: Option, gc_horizon: Option, gc_period: Option, + open_mem_limit: Option, pg_distrib_dir: Option, auth_validation_public_key_path: Option, auth_type: Option, @@ -104,6 +105,7 @@ impl CfgFileParams { checkpoint_period: get_arg("checkpoint_period"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), + open_mem_limit: get_arg("open_mem_limit"), pg_distrib_dir: get_arg("postgres-distrib"), auth_validation_public_key_path: get_arg("auth-validation-public-key-path"), auth_type: get_arg("auth-type"), @@ -122,6 +124,7 @@ impl CfgFileParams { checkpoint_period: self.checkpoint_period.or(other.checkpoint_period), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), + open_mem_limit: self.open_mem_limit.or(other.open_mem_limit), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), auth_validation_public_key_path: self .auth_validation_public_key_path @@ -166,6 +169,11 @@ impl CfgFileParams { None => DEFAULT_GC_PERIOD, }; + let open_mem_limit: usize = match self.open_mem_limit.as_ref() { + Some(open_mem_limit_str) => open_mem_limit_str.parse()?, + None => DEFAULT_OPEN_MEM_LIMIT, + }; + let pg_distrib_dir = match self.pg_distrib_dir.as_ref() { Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str), None => env::current_dir()?.join("tmp_install"), @@ -237,6 +245,7 @@ impl CfgFileParams { checkpoint_period, gc_horizon, gc_period, + open_mem_limit, superuser: String::from(DEFAULT_SUPERUSER), @@ -307,6 +316,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Interval between garbage collector iterations"), ) + .arg( + Arg::with_name("open_mem_limit") + .long("open_mem_limit") + .takes_value(true) + .help("Amount of memory reserved for buffering incoming WAL"), + ) .arg( Arg::with_name("workdir") .short("D") @@ -601,6 +616,7 @@ mod tests { checkpoint_period: Some("checkpoint_period_VALUE".to_string()), gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), + open_mem_limit: Some("open_mem_limit_VALUE".to_string()), pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()), auth_validation_public_key_path: Some( "auth_validation_public_key_path_VALUE".to_string(), @@ -624,6 +640,7 @@ checkpoint_distance = 'checkpoint_distance_VALUE' checkpoint_period = 'checkpoint_period_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' +open_mem_limit = 'open_mem_limit_VALUE' pg_distrib_dir = 'pg_distrib_dir_VALUE' auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE' auth_type = 'auth_type_VALUE' @@ -658,6 +675,7 @@ local_path = 'relish_storage_local_VALUE' checkpoint_period: Some("checkpoint_period_VALUE".to_string()), gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), + open_mem_limit: Some("open_mem_limit_VALUE".to_string()), pg_distrib_dir: Some("pg_distrib_dir_VALUE".to_string()), auth_validation_public_key_path: Some( "auth_validation_public_key_path_VALUE".to_string(), @@ -684,6 +702,7 @@ checkpoint_distance = 'checkpoint_distance_VALUE' checkpoint_period = 'checkpoint_period_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' +open_mem_limit = 'open_mem_limit_VALUE' pg_distrib_dir = 'pg_distrib_dir_VALUE' auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE' auth_type = 'auth_type_VALUE' diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index ef4ac5cc7e..decb6faf0c 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -65,6 +65,7 @@ mod storage_layer; use delta_layer::DeltaLayer; use image_layer::ImageLayer; +use global_layer_map::{LayerId, GLOBAL_LAYER_MAP}; use inmemory_layer::InMemoryLayer; use layer_map::LayerMap; use storage_layer::{ @@ -799,6 +800,10 @@ impl Timeline for LayeredTimeline { _write_guard: self.write_lock.lock().unwrap(), }) } + + fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline { + self + } } impl LayeredTimeline { @@ -1140,9 +1145,8 @@ impl LayeredTimeline { // a lot of memory and/or aren't receiving much updates anymore. let mut disk_consistent_lsn = last_record_lsn; - let mut created_historics = false; let mut layer_uploads = Vec::new(); - while let Some((oldest_slot_id, oldest_layer, oldest_generation)) = + while let Some((oldest_layer_id, oldest_layer, oldest_generation)) = layers.peek_oldest_open() { let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); @@ -1169,42 +1173,14 @@ impl LayeredTimeline { break; } - // Mark the layer as no longer accepting writes and record the end_lsn. - // This happens in-place, no new layers are created now. - // We call `get_last_record_lsn` again, which may be different from the - // original load, as we may have released the write lock since then. - oldest_layer.freeze(self.get_last_record_lsn()); - - // The layer is no longer open, update the layer map to reflect this. - // We will replace it with on-disk historics below. - layers.remove_open(oldest_slot_id); - layers.insert_historic(oldest_layer.clone()); - - // Write the now-frozen layer to disk. That could take a while, so release the lock while do it drop(layers); drop(write_guard); - let new_historics = oldest_layer.write_to_disk(self)?; + let mut this_layer_uploads = self.evict_layer(oldest_layer_id)?; + layer_uploads.append(&mut this_layer_uploads); write_guard = self.write_lock.lock().unwrap(); layers = self.layers.lock().unwrap(); - - if !new_historics.is_empty() { - created_historics = true; - } - - // Finally, replace the frozen in-memory layer with the new on-disk layers - layers.remove_historic(oldest_layer); - - // Add the historics to the LayerMap - for delta_layer in new_historics.delta_layers { - layer_uploads.push(delta_layer.path()); - layers.insert_historic(Arc::new(delta_layer)); - } - for image_layer in new_historics.image_layers { - layer_uploads.push(image_layer.path()); - layers.insert_historic(Arc::new(image_layer)); - } } // Call unload() on all frozen layers, to release memory. @@ -1217,7 +1193,7 @@ impl LayeredTimeline { drop(layers); drop(write_guard); - if created_historics { + if !layer_uploads.is_empty() { // We must fsync the timeline dir to ensure the directory entries for // new layer files are durable let timeline_dir = @@ -1270,6 +1246,55 @@ impl LayeredTimeline { Ok(()) } + fn evict_layer(&self, layer_id: LayerId) -> Result> { + // Mark the layer as no longer accepting writes and record the end_lsn. + // This happens in-place, no new layers are created now. + // We call `get_last_record_lsn` again, which may be different from the + // original load, as we may have released the write lock since then. + + let mut write_guard = self.write_lock.lock().unwrap(); + let mut layers = self.layers.lock().unwrap(); + + let mut layer_uploads = Vec::new(); + + let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); + if let Some(oldest_layer) = global_layer_map.get(&layer_id) { + drop(global_layer_map); + oldest_layer.freeze(self.get_last_record_lsn()); + + // The layer is no longer open, update the layer map to reflect this. + // We will replace it with on-disk historics below. + layers.remove_open(layer_id); + layers.insert_historic(oldest_layer.clone()); + + // Write the now-frozen layer to disk. That could take a while, so release the lock while do it + drop(layers); + drop(write_guard); + + let new_historics = oldest_layer.write_to_disk(self)?; + + write_guard = self.write_lock.lock().unwrap(); + layers = self.layers.lock().unwrap(); + + // Finally, replace the frozen in-memory layer with the new on-disk layers + layers.remove_historic(oldest_layer); + + // Add the historics to the LayerMap + for delta_layer in new_historics.delta_layers { + layer_uploads.push(delta_layer.path()); + layers.insert_historic(Arc::new(delta_layer)); + } + for image_layer in new_historics.image_layers { + layer_uploads.push(image_layer.path()); + layers.insert_historic(Arc::new(image_layer)); + } + } + drop(layers); + drop(write_guard); + + Ok(layer_uploads) + } + /// /// Garbage collect layer files on a timeline that are no longer needed. /// @@ -1837,3 +1862,32 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> { path )) } + +//----- Global layer management + +/// Check if too much memory is being used by open layers. If so, evict +pub fn evict_layer_if_needed(conf: &PageServerConf) -> Result<()> { + // Keep evicting layers until we are below the memory threshold. + let mut global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); + while let Some((layer_id, layer)) = global_layer_map.find_victim_if_needed(conf.open_mem_limit) + { + drop(global_layer_map); + let tenantid = layer.get_tenant_id(); + let timelineid = layer.get_timeline_id(); + + let _entered = + info_span!("global evict", timeline = %timelineid, tenant = %tenantid).entered(); + info!("evicting {}", layer.filename().display()); + drop(layer); + + let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?; + + timeline + .upgrade_to_layered_timeline() + .evict_layer(layer_id)?; + + global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); + } + + Ok(()) +} diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 736a2694bf..182e3e6c5c 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -148,6 +148,10 @@ pub struct DeltaLayerInner { } impl Layer for DeltaLayer { + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } diff --git a/pageserver/src/layered_repository/global_layer_map.rs b/pageserver/src/layered_repository/global_layer_map.rs index 195bb679be..9ad4bd6f44 100644 --- a/pageserver/src/layered_repository/global_layer_map.rs +++ b/pageserver/src/layered_repository/global_layer_map.rs @@ -2,22 +2,36 @@ //! Global registry of open layers. //! //! Whenever a new in-memory layer is created to hold incoming WAL, it is registered -//! in [`GLOBAL_LAYER_MAP`], so that we can keep track of the total number of in-memory layers -//! in the system, and know when we need to evict some to release memory. +//! in [`GLOBAL_LAYER_MAP`], so that we can keep track of the total number of +//! in-memory layers in the system, and know when we need to evict some to release +//! memory. //! //! Each layer is assigned a unique ID when it's registered in the global registry. //! The ID can be used to relocate the layer later, without having to hold locks. +//! +use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use super::inmemory_layer::InMemoryLayer; use lazy_static::lazy_static; +const MAX_USAGE_COUNT: u8 = 5; + lazy_static! { pub static ref GLOBAL_LAYER_MAP: RwLock = RwLock::new(OpenLayers::default()); } +/// +/// How much memory is being used by all the open layers? This is used to trigger +/// freezing and evicting an open layer to disk. +/// +/// This is only a rough approximation, it leaves out a lot of things like malloc() +/// overhead. But as long there is enough "slop" and it's not set too close to the RAM +/// size on the system, it's good enough. +pub static GLOBAL_OPEN_MEM_USAGE: AtomicUsize = AtomicUsize::new(0); + // TODO these types can probably be smaller #[derive(PartialEq, Eq, Clone, Copy)] pub struct LayerId { @@ -35,11 +49,14 @@ enum SlotData { struct Slot { tag: u64, data: SlotData, + usage_count: AtomicU8, // for clock algorithm } #[derive(Default)] pub struct OpenLayers { slots: Vec, + num_occupied: usize, + next_victim: AtomicUsize, // Head of free-slot list. next_empty_slot_idx: Option, @@ -54,21 +71,29 @@ impl OpenLayers { self.slots.push(Slot { tag: 0, data: SlotData::Vacant(None), + usage_count: AtomicU8::new(0), }); idx } }; + let slots_len = self.slots.len(); let slot = &mut self.slots[slot_idx]; match slot.data { - SlotData::Occupied(_) => unimplemented!(), + SlotData::Occupied(_) => { + panic!("an occupied slot was in the free list"); + } SlotData::Vacant(next_empty_slot_idx) => { self.next_empty_slot_idx = next_empty_slot_idx; } } slot.data = SlotData::Occupied(layer); + slot.usage_count.store(1, Ordering::Relaxed); + + self.num_occupied += 1; + assert!(self.num_occupied <= slots_len); LayerId { index: slot_idx, @@ -83,12 +108,81 @@ impl OpenLayers { } if let SlotData::Occupied(layer) = &slot.data { + let _ = slot.usage_count.fetch_update( + Ordering::Relaxed, + Ordering::Relaxed, + |old_usage_count| { + if old_usage_count < MAX_USAGE_COUNT { + Some(old_usage_count + 1) + } else { + None + } + }, + ); Some(Arc::clone(layer)) } else { None } } + /// Find a victim layer to evict, if the total memory usage of all open layers + /// is larger than 'limit' + pub fn find_victim_if_needed(&self, limit: usize) -> Option<(LayerId, Arc)> { + let mem_usage = GLOBAL_OPEN_MEM_USAGE.load(Ordering::Relaxed); + + if mem_usage > limit { + self.find_victim() + } else { + None + } + } + + pub fn find_victim(&self) -> Option<(LayerId, Arc)> { + if self.num_occupied == 0 { + return None; + } + + // Run the clock algorithm. + // + // FIXME: It's theoretically possible that a constant stream of get() requests + // comes in faster than we advance the clock hand, so that this never finishes. + loop { + // FIXME: Because we interpret the clock hand variable modulo slots.len(), the + // hand effectively jumps to a more or less random place whenever the array is + // expanded. That's relatively harmless, it just leads to a non-optimal choice + // of victim. Also, in a server that runs for long enough, the array should reach + // a steady-state size and not grow anymore. + let next_victim = self.next_victim.fetch_add(1, Ordering::Relaxed) % self.slots.len(); + + let slot = &self.slots[next_victim]; + + if let SlotData::Occupied(data) = &slot.data { + fn update_fn(old_usage_count: u8) -> Option { + if old_usage_count > 0 { + Some(old_usage_count - 1) + } else { + None + } + } + + if slot + .usage_count + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, update_fn) + .is_err() + { + // Found a slot with usage_count == 0. Return it. + return Some(( + LayerId { + index: next_victim, + tag: slot.tag, + }, + Arc::clone(data), + )); + } + } + } + } + // TODO this won't be a public API in the future pub fn remove(&mut self, layer_id: &LayerId) { let slot = &mut self.slots[layer_id.index]; @@ -107,6 +201,9 @@ impl OpenLayers { slot.data = SlotData::Vacant(self.next_empty_slot_idx); self.next_empty_slot_idx = Some(layer_id.index); + assert!(self.num_occupied > 0); + self.num_occupied -= 1; + slot.tag = slot.tag.wrapping_add(1); } } diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 744f793558..179b66853e 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -117,6 +117,10 @@ impl Layer for ImageLayer { PathBuf::from(self.layer_name().to_string()) } + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 474eef09c4..028e72267c 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -3,6 +3,7 @@ //! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation. //! use crate::layered_repository::filename::DeltaFileName; +use crate::layered_repository::global_layer_map::GLOBAL_OPEN_MEM_USAGE; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE, }; @@ -16,6 +17,7 @@ use anyhow::{bail, ensure, Result}; use bytes::Bytes; use log::*; use std::path::PathBuf; +use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use zenith_utils::vec_map::VecMap; @@ -69,6 +71,15 @@ pub struct InMemoryLayerInner { /// a non-blocky rel, 'segsizes' is not used and is always empty. /// segsizes: VecMap, + + /// Approximate amount of memory used by this layer. + /// + /// TODO: This is currently a very crude metric, we don't take into account allocator + /// overhead, memory fragmentation, memory used by the VecMaps, nor many other things. + /// Just the actual # of bytes of a page image (8 kB) or the size of a WAL record. + /// + /// Whenever this is changed, you must also modify GLOBAL_OPEN_MEM_USAGE accordingly! + mem_usage: usize, } impl InMemoryLayerInner { @@ -89,6 +100,15 @@ impl InMemoryLayerInner { } } +impl Drop for InMemoryLayerInner { + fn drop(&mut self) { + if self.mem_usage > 0 { + GLOBAL_OPEN_MEM_USAGE.fetch_sub(self.mem_usage, Ordering::Relaxed); + self.mem_usage = 0; + } + } +} + impl Layer for InMemoryLayer { // An in-memory layer doesn't really have a filename as it's not stored on disk, // but we construct a filename as if it was a delta layer @@ -113,6 +133,10 @@ impl Layer for InMemoryLayer { PathBuf::from(format!("inmem-{}", delta_filename)) } + fn get_tenant_id(&self) -> ZTenantId { + self.tenantid + } + fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } @@ -281,12 +305,6 @@ pub struct LayersOnDisk { pub image_layers: Vec, } -impl LayersOnDisk { - pub fn is_empty(&self) -> bool { - self.delta_layers.is_empty() && self.image_layers.is_empty() - } -} - impl InMemoryLayer { /// Return the oldest page version that's stored in this layer pub fn get_oldest_pending_lsn(&self) -> Lsn { @@ -330,6 +348,7 @@ impl InMemoryLayer { dropped: false, page_versions: PageVersions::default(), segsizes, + mem_usage: 0, }), }) } @@ -376,6 +395,13 @@ impl InMemoryLayer { inner.assert_writeable(); + let mut mem_usage = 0; + if let Some(img) = &pv.page_image { + mem_usage += img.len(); + } else if let Some(rec) = &pv.record { + mem_usage += rec.rec.len(); + } + let old = inner.page_versions.append_or_update_last(blknum, lsn, pv); if old.is_some() { @@ -386,6 +412,9 @@ impl InMemoryLayer { ); } + inner.mem_usage += mem_usage; + GLOBAL_OPEN_MEM_USAGE.fetch_add(mem_usage, Ordering::Relaxed); + // Also update the relation size, if this extended the relation. if self.seg.rel.is_blocky() { let newsize = blknum - self.seg.segno * RELISH_SEG_SIZE + 1; @@ -521,6 +550,7 @@ impl InMemoryLayer { dropped: false, page_versions: PageVersions::default(), segsizes, + mem_usage: 0, }), }) } @@ -550,6 +580,16 @@ impl InMemoryLayer { for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) { assert!(lsn <= end_lsn); } + + // It's a bit premature to subtract the global mem usage here already. + // This layer consumes memory until it's written out to disk and dropped. + // But GLOBAL_OPEN_MEM_USAGE is used to trigger layer eviction, if there are + // too many open layers, and from that point of view this should no longer be + // counted against the global mem usage. + if inner.mem_usage > 0 { + GLOBAL_OPEN_MEM_USAGE.fetch_sub(inner.mem_usage, Ordering::Relaxed); + inner.mem_usage = 0; + } } } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 0a86fe407d..9bbe9cd76d 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -4,7 +4,7 @@ use crate::relish::RelishTag; use crate::repository::WALRecord; -use crate::ZTimelineId; +use crate::{ZTenantId, ZTimelineId}; use anyhow::Result; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -104,6 +104,8 @@ pub enum PageReconstructResult { /// in-memory and on-disk layers. /// pub trait Layer: Send + Sync { + fn get_tenant_id(&self) -> ZTenantId; + /// Identify the timeline this relish belongs to fn get_timeline_id(&self) -> ZTimelineId; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 114630a624..50b4c4dfda 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -43,6 +43,8 @@ pub mod defaults { pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; + + pub const DEFAULT_OPEN_MEM_LIMIT: usize = 128 * 1024 * 1024; } lazy_static! { @@ -71,6 +73,8 @@ pub struct PageServerConf { pub gc_period: Duration, pub superuser: String, + pub open_mem_limit: usize, + // Repository directory, relative to current working directory. // Normally, the page server changes the current working directory // to the repository, and 'workdir' is always '.'. But we don't do @@ -153,6 +157,7 @@ impl PageServerConf { checkpoint_period: Duration::from_secs(10), gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), + open_mem_limit: defaults::DEFAULT_OPEN_MEM_LIMIT, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), superuser: "zenith_admin".to_string(), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index b093833b93..c925c45200 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -160,6 +160,9 @@ pub trait Timeline: Send + Sync { /// Does the same as get_current_logical_size but counted on demand. /// Used in tests to ensure thet incremental and non incremental variants match. fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result; + + /// An escape hatch to allow "casting" a generic Timeline to LayeredTimeline. + fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline; } /// Various functions to mutate the timeline. diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 1f84ed8507..d6212a5f54 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -5,6 +5,7 @@ //! //! We keep one WAL receiver active per timeline. +use crate::layered_repository; use crate::relish::*; use crate::restore_local_repo; use crate::tenant_mgr; @@ -175,7 +176,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: } fn walreceiver_main( - _conf: &PageServerConf, + conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, tenantid: ZTenantId, @@ -295,6 +296,9 @@ fn walreceiver_main( caught_up = true; } + // Release memory if needed + layered_repository::evict_layer_if_needed(conf)?; + Some(endlsn) }