From ca9af3747800b81f57c6e2374a0d19fccd7fd8d9 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 27 Sep 2021 14:15:55 +0300 Subject: [PATCH] Do not write WAL at pageserver (#645) * Do not write WAL at pageserver * Remove import_timeline_wal function --- pageserver/src/branches.rs | 4 - pageserver/src/layered_repository.rs | 6 -- pageserver/src/lib.rs | 4 +- pageserver/src/restore_local_repo.rs | 98 +------------------------ pageserver/src/walreceiver.rs | 106 --------------------------- 5 files changed, 3 insertions(+), 215 deletions(-) diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 4094757f31..cc1ca464ad 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -222,10 +222,6 @@ fn bootstrap_timeline( // LSN, and any WAL after that. let timeline = repo.create_empty_timeline(tli)?; restore_local_repo::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; - - let wal_dir = pgdata_path.join("pg_wal"); - restore_local_repo::import_timeline_wal(&wal_dir, &*timeline, lsn)?; - timeline.checkpoint()?; println!( diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 267e30d706..fbf7a9de1f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -35,7 +35,6 @@ use crate::layered_repository::inmemory_layer::FreezeLayers; use crate::relish::*; use crate::relish_storage::storage_uploader::QueueBasedRelishUploader; use crate::repository::{GcResult, Repository, Timeline, WALRecord}; -use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; @@ -254,11 +253,6 @@ impl LayeredRepository { timelineid, timeline.get_last_record_lsn() ); - let wal_dir = self - .conf - .timeline_path(&timelineid, &self.tenantid) - .join("wal"); - import_timeline_wal(&wal_dir, timeline.as_ref(), timeline.get_last_record_lsn())?; if cfg!(debug_assertions) { // check again after wal loading diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 32d2d66e6b..eff9b54897 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -32,8 +32,8 @@ pub mod defaults { // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. - pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 64 * 1024 * 1024; - pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(100); + pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; + pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index ff394e8094..9cafa23e0e 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -9,9 +9,7 @@ use std::cmp::min; use std::fs; use std::fs::File; use std::io::Read; -use std::io::Seek; -use std::io::SeekFrom; -use std::path::{Path, PathBuf}; +use std::path::Path; use anyhow::Result; use bytes::{Buf, Bytes}; @@ -283,100 +281,6 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa Ok(()) } -/// Scan PostgreSQL WAL files in given directory -/// and load all records >= 'startpoint' into the repository. -pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> { - let mut waldecoder = WalStreamDecoder::new(startpoint); - - let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); - let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); - let mut last_lsn = startpoint; - - let checkpoint_bytes = timeline.get_page_at_lsn(RelishTag::Checkpoint, 0, startpoint)?; - let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; - - loop { - // FIXME: assume postgresql tli 1 for now - let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); - let mut buf = Vec::new(); - - //Read local file - let mut path = walpath.join(&filename); - - // It could be as .partial - if !PathBuf::from(&path).exists() { - path = walpath.join(filename + ".partial"); - } - - // Slurp the WAL file - let open_result = File::open(&path); - if let Err(e) = &open_result { - if e.kind() == std::io::ErrorKind::NotFound { - break; - } - } - let mut file = open_result?; - - if offset > 0 { - file.seek(SeekFrom::Start(offset as u64))?; - } - - let nread = file.read_to_end(&mut buf)?; - if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize { - // Maybe allow this for .partial files? - error!("read only {} bytes from WAL file", nread); - } - - waldecoder.feed_bytes(&buf); - - let mut nrecords = 0; - loop { - let rec = waldecoder.poll_decode(); - if rec.is_err() { - // Assume that an error means we've reached the end of - // a partial WAL record. So that's ok. - trace!("WAL decoder error {:?}", rec); - break; - } - if let Some((lsn, recdata)) = rec.unwrap() { - // The previous record has been handled, let the repository know that - // it is up-to-date to this LSN. (We do this here on the "next" iteration, - // rather than right after the save_decoded_record, because at the end of - // the WAL, we will also need to perform the update of the checkpoint data - // with the same LSN as the last actual record.) - timeline.advance_last_record_lsn(last_lsn); - - let decoded = decode_wal_record(recdata.clone()); - save_decoded_record(&mut checkpoint, timeline, &decoded, recdata, lsn)?; - last_lsn = lsn; - } else { - break; - } - nrecords += 1; - } - - info!("imported {} records up to {}", nrecords, last_lsn); - - segno += 1; - offset = 0; - } - - if last_lsn != startpoint { - info!( - "reached end of WAL at {}, updating checkpoint info", - last_lsn - ); - let checkpoint_bytes = checkpoint.encode(); - timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?; - - timeline.advance_last_record_lsn(last_lsn); - } else { - info!("no WAL to import at {}", last_lsn); - } - - Ok(()) -} - /// /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the /// relations/pages that the record affects. diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 0787e59f22..cba2207496 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -23,8 +23,6 @@ use postgres_types::PgLsn; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs; -use std::fs::{File, OpenOptions}; -use std::io::{Seek, SeekFrom, Write}; use std::str::FromStr; use std::sync::Mutex; use std::thread; @@ -192,15 +190,6 @@ fn walreceiver_main( let endlsn = startlsn + data.len() as u64; let prev_last_rec_lsn = last_rec_lsn; - write_wal_file( - conf, - startlsn, - &timelineid, - pg_constants::WAL_SEGMENT_SIZE, - data, - &tenantid, - )?; - trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); @@ -403,98 +392,3 @@ pub fn identify_system(client: &mut Client) -> Result { Err(IdentifyError.into()) } } - -fn write_wal_file( - conf: &PageServerConf, - startpos: Lsn, - timelineid: &ZTimelineId, - wal_seg_size: usize, - buf: &[u8], - tenantid: &ZTenantId, -) -> anyhow::Result<()> { - let mut bytes_left: usize = buf.len(); - let mut bytes_written: usize = 0; - let mut partial; - let mut start_pos = startpos; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - - let wal_dir = conf.wal_dir_path(timelineid, tenantid); - - /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size); - - while bytes_left != 0 { - let bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach wal - * segment size. - */ - if xlogoff + bytes_left > wal_seg_size { - bytes_to_write = wal_seg_size - xlogoff; - } else { - bytes_to_write = bytes_left; - } - - /* Open file */ - let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName( - 1, // FIXME: always use Postgres timeline 1 - segno, - wal_seg_size, - ); - let wal_file_path = wal_dir.join(wal_file_name.clone()); - let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial"); - - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) { - /* Try to open existed partial file */ - wal_file = file; - partial = true; - } else { - /* Create and fill new partial file */ - partial = true; - match OpenOptions::new() - .create(true) - .write(true) - .open(&wal_file_partial_path) - { - Ok(mut file) => { - for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { - file.write_all(ZERO_BLOCK)?; - } - wal_file = file; - } - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - - // FIXME: Flush the file - //wal_file.sync_all()?; - } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - start_pos += bytes_to_write as u64; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size) == 0 { - xlogoff = 0; - if partial { - fs::rename(&wal_file_partial_path, &wal_file_path)?; - } - } - } - Ok(()) -}