From 09cced085546d496121778c33a4418e0397b5d4b Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 16 Sep 2021 12:04:39 +0300 Subject: [PATCH] Delay adding new layers if checpointing is too slow to avoid OOM --- pageserver/src/layered_repository.rs | 148 +++++++++++++++++---------- pageserver/src/lib.rs | 2 +- 2 files changed, 96 insertions(+), 54 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index eb0e4368f2..b9a3778c9e 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -70,6 +70,9 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); +const MIN_ADD_LAYER_DELAY: u64 = 1000; // milliseconds +const MAX_ADD_LAYER_DELAY: u64 = 10000; // milliseconds + // Metrics collected on operations on the storage repository. lazy_static! { static ref STORAGE_TIME: HistogramVec = register_histogram_vec!( @@ -1159,6 +1162,7 @@ impl LayeredTimeline { /// fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result> { let mut layers = self.layers.lock().unwrap(); + let mut delayed = false; assert!(lsn.is_aligned()); @@ -1171,72 +1175,110 @@ impl LayeredTimeline { ); } - // Do we have a layer open for writing already? - if let Some(layer) = layers.get_open(&seg) { - if layer.get_start_lsn() > lsn { - bail!("unexpected open layer in the future"); + loop { + // Do we have a layer open for writing already? + if let Some(layer) = layers.get_open(&seg) { + if layer.get_start_lsn() > lsn { + bail!("unexpected open layer in the future"); + } + return Ok(layer); } - return Ok(layer); - } - // No (writeable) layer for this relation yet. Create one. - // - // Is this a completely new relation? Or the first modification after branching? - // + // No (writeable) layer for this relation yet. Create one. + // + // Is this a completely new relation? Or the first modification after branching? + // - let layer; - if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read_locked(seg, lsn, &layers)? { - // Create new entry after the previous one. - let start_lsn; - if prev_layer.get_timeline_id() != self.timelineid { - // First modification on this timeline - start_lsn = self.ancestor_lsn; + let layer; + if let Some((prev_layer, _prev_lsn)) = + self.get_layer_for_read_locked(seg, lsn, &layers)? + { + // Create new entry after the previous one. + let start_lsn; + if prev_layer.get_timeline_id() != self.timelineid { + // First modification on this timeline + start_lsn = self.ancestor_lsn; + trace!( + "creating file for write for {} at branch point {}/{}", + seg, + self.timelineid, + start_lsn + ); + } else { + start_lsn = prev_layer.get_end_lsn(); + trace!( + "creating file for write for {} after previous layer {}/{}", + seg, + self.timelineid, + start_lsn + ); + } trace!( - "creating file for write for {} at branch point {}/{}", - seg, - self.timelineid, - start_lsn + "prev layer is at {}/{} - {}", + prev_layer.get_timeline_id(), + prev_layer.get_start_lsn(), + prev_layer.get_end_lsn() ); + layer = InMemoryLayer::create_successor_layer( + self.conf, + prev_layer, + self.timelineid, + self.tenantid, + start_lsn, + lsn, + )?; } else { - start_lsn = prev_layer.get_end_lsn(); + // New relation. trace!( - "creating file for write for {} after previous layer {}/{}", + "creating layer for write for new rel {} at {}/{}", seg, self.timelineid, - start_lsn + lsn ); + if !delayed { + if let Some((oldest_layer, _oldest_generation)) = layers.peek_oldest_open() { + let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); + let distance = lsn.widening_sub(oldest_pending_lsn); + let excess_factor = distance / conf.checkpoint_distance - 1; + if excess_factor > 0 { + // Memory layers consume two much memory because checkpointer + // is not able to keep up with wal receiveer and flushes inmemory layers with + // the same speed. So we have to slowdown receiver. + // But we can not delay receiver too long because get_page_at_lsn may wait + // for most recent WAL records with can nto be receiver because receiver is blocked. + // It may cause timeout exitration in wait_lsn and so page access error. + // So we increase timeout proprtionally to memory limit excess bit limit maximal value of delay. + let timeout = std::cmp::min( + MIN_ADD_LAYER_DELAY * (excess_factor as u64), + MAX_ADD_LAYER_DELAY, + ); + info!( + "Delay wal receiver: distance={}, excess_factor={}, timeout={}", + distance, excess_factor, timeout + ); + delayed = true; + drop(layers); + thread::sleep(Duration::from_millis(timeout)); + layers = self.layers.lock().unwrap(); + continue; + } + } + } + layer = InMemoryLayer::create( + self.conf, + self.timelineid, + self.tenantid, + seg, + lsn, + lsn, + )?; } - trace!( - "prev layer is at {}/{} - {}", - prev_layer.get_timeline_id(), - prev_layer.get_start_lsn(), - prev_layer.get_end_lsn() - ); - layer = InMemoryLayer::create_successor_layer( - self.conf, - prev_layer, - self.timelineid, - self.tenantid, - start_lsn, - lsn, - )?; - } else { - // New relation. - trace!( - "creating layer for write for new rel {} at {}/{}", - seg, - self.timelineid, - lsn - ); - layer = - InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?; + let layer_rc: Arc = Arc::new(layer); + layers.insert_open(Arc::clone(&layer_rc)); + + return Ok(layer_rc); } - - let layer_rc: Arc = Arc::new(layer); - layers.insert_open(Arc::clone(&layer_rc)); - - Ok(layer_rc) } /// diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 32d2d66e6b..712616adfd 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -36,7 +36,7 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(100); pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; - pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; }