From 6056ce76024565c3e030f9c3d8f898187ba9d6ea Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 18 Sep 2021 15:02:07 +0300 Subject: [PATCH] Implement backpressure for compute node to avoid WAL overflow --- control_plane/src/compute.rs | 2 +- pageserver/src/layered_repository.rs | 36 +-------- pageserver/src/lib.rs | 4 +- pageserver/src/repository.rs | 1 + pageserver/src/walreceiver.rs | 107 +-------------------------- vendor/postgres | 2 +- 6 files changed, 11 insertions(+), 141 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 200dcd02e6..18616752f1 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -322,7 +322,7 @@ impl PostgresNode { shared_buffers = 1MB\n\ fsync = off\n\ max_connections = 100\n\ - wal_sender_timeout = 0\n\ + wal_sender_timeout = 10s\n\ wal_level = replica\n\ listen_addresses = '{address}'\n\ port = {port}\n", diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index e4614d41e6..65e9246996 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -70,9 +70,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); -const MIN_ADD_LAYER_DELAY: u64 = 1000; // milliseconds -const MAX_ADD_LAYER_DELAY: u64 = 10000; // milliseconds - // Metrics collected on operations on the storage repository. lazy_static! { static ref STORAGE_TIME: HistogramVec = register_histogram_vec!( @@ -948,6 +945,10 @@ impl Timeline for LayeredTimeline { Ok(total_blocks * BLCKSZ as usize) } + + fn get_disk_consistent_lsn(&self) -> Lsn { + self.disk_consistent_lsn.load() + } } impl LayeredTimeline { @@ -1235,35 +1236,6 @@ impl LayeredTimeline { self.timelineid, lsn ); - if !delayed { - if let Some((oldest_layer, _oldest_generation)) = layers.peek_oldest_open() { - let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); - let distance = lsn.widening_sub(oldest_pending_lsn); - let excess_factor = distance / self.conf.checkpoint_distance as i128 - 1; - if excess_factor > 0 { - // Memory layers consume two much memory because checkpointer - // is not able to keep up with wal receiveer and flushes inmemory layers with - // the same speed. So we have to slowdown receiver. - // But we can not delay receiver too long because get_page_at_lsn may wait - // for most recent WAL records with can nto be receiver because receiver is blocked. - // It may cause timeout exitration in wait_lsn and so page access error. - // So we increase timeout proprtionally to memory limit excess bit limit maximal value of delay. - let timeout = std::cmp::min( - MIN_ADD_LAYER_DELAY * (excess_factor as u64), - MAX_ADD_LAYER_DELAY, - ); - info!( - "Delay wal receiver: distance={}, excess_factor={}, timeout={}", - distance, excess_factor, timeout - ); - delayed = true; - drop(layers); - thread::sleep(Duration::from_millis(timeout)); - layers = self.layers.lock().unwrap(); - continue; - } - } - } layer = InMemoryLayer::create( self.conf, self.timelineid, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index ee1a7e6a0b..eff9b54897 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -33,10 +33,10 @@ pub mod defaults { // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; - pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10); + pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; - pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index f0dad8a91e..1efbb37d0c 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -149,6 +149,7 @@ pub trait Timeline: Send + Sync { fn get_last_record_lsn(&self) -> Lsn; fn get_prev_record_lsn(&self) -> Lsn; fn get_start_lsn(&self) -> Lsn; + fn get_disk_consistent_lsn(&self) -> Lsn; /// /// Flush to disk all data that was written with the put_* functions diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index fc9afb95cf..a34964331f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -192,15 +192,6 @@ fn walreceiver_main( let endlsn = startlsn + data.len() as u64; let prev_last_rec_lsn = last_rec_lsn; - write_wal_file( - conf, - startlsn, - &timelineid, - pg_constants::WAL_SEGMENT_SIZE, - data, - &tenantid, - )?; - trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); @@ -301,12 +292,12 @@ fn walreceiver_main( if let Some(last_lsn) = status_update { // TODO: More thought should go into what values are sent here. let last_lsn = PgLsn::from(u64::from(last_lsn)); - let write_lsn = last_lsn; + let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); let flush_lsn = last_lsn; let apply_lsn = PgLsn::from(0); let ts = SystemTime::now(); const NO_REPLY: u8 = 0; - + info!("Send write lsn={}", write_lsn); physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; } } @@ -401,97 +392,3 @@ pub fn identify_system(client: &mut Client) -> Result { } } -fn write_wal_file( - conf: &PageServerConf, - startpos: Lsn, - timelineid: &ZTimelineId, - wal_seg_size: usize, - buf: &[u8], - tenantid: &ZTenantId, -) -> anyhow::Result<()> { - let mut bytes_left: usize = buf.len(); - let mut bytes_written: usize = 0; - let mut partial; - let mut start_pos = startpos; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - - let wal_dir = conf.wal_dir_path(timelineid, tenantid); - - /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size); - - while bytes_left != 0 { - let bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach wal - * segment size. - */ - if xlogoff + bytes_left > wal_seg_size { - bytes_to_write = wal_seg_size - xlogoff; - } else { - bytes_to_write = bytes_left; - } - - /* Open file */ - let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName( - 1, // FIXME: always use Postgres timeline 1 - segno, - wal_seg_size, - ); - let wal_file_path = wal_dir.join(wal_file_name.clone()); - let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial"); - - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) { - /* Try to open existed partial file */ - wal_file = file; - partial = true; - } else { - /* Create and fill new partial file */ - partial = true; - match OpenOptions::new() - .create(true) - .write(true) - .open(&wal_file_partial_path) - { - Ok(mut file) => { - for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { - file.write_all(ZERO_BLOCK)?; - } - wal_file = file; - } - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - - // FIXME: Flush the file - //wal_file.sync_all()?; - } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - start_pos += bytes_to_write as u64; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size) == 0 { - xlogoff = 0; - if partial { - fs::rename(&wal_file_partial_path, &wal_file_path)?; - } - } - } - Ok(()) -} diff --git a/vendor/postgres b/vendor/postgres index 9374fe0963..ab3b12fa2b 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 9374fe0963907770132e99a57dfe24497f28392f +Subproject commit ab3b12fa2b6f4ea9b3364dd3bdbec46e64e0b4d8