Implement backpressure for compute node to avoid WAL overflow

This commit is contained in:
Konstantin Knizhnik
2021-09-18 15:02:07 +03:00
parent e53dd5240a
commit 6056ce7602
6 changed files with 11 additions and 141 deletions

View File

@@ -322,7 +322,7 @@ impl PostgresNode {
shared_buffers = 1MB\n\
fsync = off\n\
max_connections = 100\n\
wal_sender_timeout = 0\n\
wal_sender_timeout = 10s\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",

View File

@@ -70,9 +70,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
const MIN_ADD_LAYER_DELAY: u64 = 1000; // milliseconds
const MAX_ADD_LAYER_DELAY: u64 = 10000; // milliseconds
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
@@ -948,6 +945,10 @@ impl Timeline for LayeredTimeline {
Ok(total_blocks * BLCKSZ as usize)
}
fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn.load()
}
}
impl LayeredTimeline {
@@ -1235,35 +1236,6 @@ impl LayeredTimeline {
self.timelineid,
lsn
);
if !delayed {
if let Some((oldest_layer, _oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = lsn.widening_sub(oldest_pending_lsn);
let excess_factor = distance / self.conf.checkpoint_distance as i128 - 1;
if excess_factor > 0 {
// Memory layers consume two much memory because checkpointer
// is not able to keep up with wal receiveer and flushes inmemory layers with
// the same speed. So we have to slowdown receiver.
// But we can not delay receiver too long because get_page_at_lsn may wait
// for most recent WAL records with can nto be receiver because receiver is blocked.
// It may cause timeout exitration in wait_lsn and so page access error.
// So we increase timeout proprtionally to memory limit excess bit limit maximal value of delay.
let timeout = std::cmp::min(
MIN_ADD_LAYER_DELAY * (excess_factor as u64),
MAX_ADD_LAYER_DELAY,
);
info!(
"Delay wal receiver: distance={}, excess_factor={}, timeout={}",
distance, excess_factor, timeout
);
delayed = true;
drop(layers);
thread::sleep(Duration::from_millis(timeout));
layers = self.layers.lock().unwrap();
continue;
}
}
}
layer = InMemoryLayer::create(
self.conf,
self.timelineid,

View File

@@ -33,10 +33,10 @@ pub mod defaults {
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
}

View File

@@ -149,6 +149,7 @@ pub trait Timeline: Send + Sync {
fn get_last_record_lsn(&self) -> Lsn;
fn get_prev_record_lsn(&self) -> Lsn;
fn get_start_lsn(&self) -> Lsn;
fn get_disk_consistent_lsn(&self) -> Lsn;
///
/// Flush to disk all data that was written with the put_* functions

View File

@@ -192,15 +192,6 @@ fn walreceiver_main(
let endlsn = startlsn + data.len() as u64;
let prev_last_rec_lsn = last_rec_lsn;
write_wal_file(
conf,
startlsn,
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
data,
&tenantid,
)?;
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);
@@ -301,12 +292,12 @@ fn walreceiver_main(
if let Some(last_lsn) = status_update {
// TODO: More thought should go into what values are sent here.
let last_lsn = PgLsn::from(u64::from(last_lsn));
let write_lsn = last_lsn;
let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
let flush_lsn = last_lsn;
let apply_lsn = PgLsn::from(0);
let ts = SystemTime::now();
const NO_REPLY: u8 = 0;
info!("Send write lsn={}", write_lsn);
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
}
}
@@ -401,97 +392,3 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
}
}
fn write_wal_file(
conf: &PageServerConf,
startpos: Lsn,
timelineid: &ZTimelineId,
wal_seg_size: usize,
buf: &[u8],
tenantid: &ZTenantId,
) -> anyhow::Result<()> {
let mut bytes_left: usize = buf.len();
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
while bytes_left != 0 {
let bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach wal
* segment size.
*/
if xlogoff + bytes_left > wal_seg_size {
bytes_to_write = wal_seg_size - xlogoff;
} else {
bytes_to_write = bytes_left;
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
wal_seg_size,
);
let wal_file_path = wal_dir.join(wal_file_name.clone());
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
/* Try to open existed partial file */
wal_file = file;
partial = true;
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(ZERO_BLOCK)?;
}
wal_file = file;
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
}
}
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// FIXME: Flush the file
//wal_file.sync_all()?;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
start_pos += bytes_to_write as u64;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
}
}
}
Ok(())
}