Delay adding new layers if checpointing is too slow to avoid OOM

This commit is contained in:
Konstantin Knizhnik
2021-09-16 12:04:39 +03:00
parent 16d3dc821a
commit 09cced0855
2 changed files with 96 additions and 54 deletions

View File

@@ -70,6 +70,9 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
const MIN_ADD_LAYER_DELAY: u64 = 1000; // milliseconds
const MAX_ADD_LAYER_DELAY: u64 = 10000; // milliseconds
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
@@ -1159,6 +1162,7 @@ impl LayeredTimeline {
///
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.lock().unwrap();
let mut delayed = false;
assert!(lsn.is_aligned());
@@ -1171,72 +1175,110 @@ impl LayeredTimeline {
);
}
// Do we have a layer open for writing already?
if let Some(layer) = layers.get_open(&seg) {
if layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future");
loop {
// Do we have a layer open for writing already?
if let Some(layer) = layers.get_open(&seg) {
if layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future");
}
return Ok(layer);
}
return Ok(layer);
}
// No (writeable) layer for this relation yet. Create one.
//
// Is this a completely new relation? Or the first modification after branching?
//
// No (writeable) layer for this relation yet. Create one.
//
// Is this a completely new relation? Or the first modification after branching?
//
let layer;
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read_locked(seg, lsn, &layers)? {
// Create new entry after the previous one.
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
start_lsn = self.ancestor_lsn;
let layer;
if let Some((prev_layer, _prev_lsn)) =
self.get_layer_for_read_locked(seg, lsn, &layers)?
{
// Create new entry after the previous one.
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
start_lsn = self.ancestor_lsn;
trace!(
"creating file for write for {} at branch point {}/{}",
seg,
self.timelineid,
start_lsn
);
} else {
start_lsn = prev_layer.get_end_lsn();
trace!(
"creating file for write for {} after previous layer {}/{}",
seg,
self.timelineid,
start_lsn
);
}
trace!(
"creating file for write for {} at branch point {}/{}",
seg,
self.timelineid,
start_lsn
"prev layer is at {}/{} - {}",
prev_layer.get_timeline_id(),
prev_layer.get_start_lsn(),
prev_layer.get_end_lsn()
);
layer = InMemoryLayer::create_successor_layer(
self.conf,
prev_layer,
self.timelineid,
self.tenantid,
start_lsn,
lsn,
)?;
} else {
start_lsn = prev_layer.get_end_lsn();
// New relation.
trace!(
"creating file for write for {} after previous layer {}/{}",
"creating layer for write for new rel {} at {}/{}",
seg,
self.timelineid,
start_lsn
lsn
);
if !delayed {
if let Some((oldest_layer, _oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = lsn.widening_sub(oldest_pending_lsn);
let excess_factor = distance / conf.checkpoint_distance - 1;
if excess_factor > 0 {
// Memory layers consume two much memory because checkpointer
// is not able to keep up with wal receiveer and flushes inmemory layers with
// the same speed. So we have to slowdown receiver.
// But we can not delay receiver too long because get_page_at_lsn may wait
// for most recent WAL records with can nto be receiver because receiver is blocked.
// It may cause timeout exitration in wait_lsn and so page access error.
// So we increase timeout proprtionally to memory limit excess bit limit maximal value of delay.
let timeout = std::cmp::min(
MIN_ADD_LAYER_DELAY * (excess_factor as u64),
MAX_ADD_LAYER_DELAY,
);
info!(
"Delay wal receiver: distance={}, excess_factor={}, timeout={}",
distance, excess_factor, timeout
);
delayed = true;
drop(layers);
thread::sleep(Duration::from_millis(timeout));
layers = self.layers.lock().unwrap();
continue;
}
}
}
layer = InMemoryLayer::create(
self.conf,
self.timelineid,
self.tenantid,
seg,
lsn,
lsn,
)?;
}
trace!(
"prev layer is at {}/{} - {}",
prev_layer.get_timeline_id(),
prev_layer.get_start_lsn(),
prev_layer.get_end_lsn()
);
layer = InMemoryLayer::create_successor_layer(
self.conf,
prev_layer,
self.timelineid,
self.tenantid,
start_lsn,
lsn,
)?;
} else {
// New relation.
trace!(
"creating layer for write for new rel {} at {}/{}",
seg,
self.timelineid,
lsn
);
layer =
InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
let layer_rc: Arc<InMemoryLayer> = Arc::new(layer);
layers.insert_open(Arc::clone(&layer_rc));
return Ok(layer_rc);
}
let layer_rc: Arc<InMemoryLayer> = Arc::new(layer);
layers.insert_open(Arc::clone(&layer_rc));
Ok(layer_rc)
}
///

View File

@@ -36,7 +36,7 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
}