diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index ce804aacf4..9db3421dfd 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -22,8 +22,6 @@ use postgres_types::PgLsn; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs; -use std::fs::{File, OpenOptions}; -use std::io::{Seek, SeekFrom, Write}; use std::str::FromStr; use std::sync::Mutex; use std::thread; @@ -194,15 +192,6 @@ fn walreceiver_main( let endlsn = startlsn + data.len() as u64; let prev_last_rec_lsn = last_rec_lsn; - write_wal_file( - conf, - startlsn, - &timelineid, - pg_constants::WAL_SEGMENT_SIZE, - data, - tenantid, - )?; - trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); @@ -236,7 +225,9 @@ fn walreceiver_main( Ok(None) => { trace!( "End of replication stream {}..{} at {}", - startlsn, endlsn, last_rec_lsn + startlsn, + endlsn, + last_rec_lsn ); break; } @@ -420,98 +411,3 @@ pub fn identify_system(client: &mut Client) -> Result { Err(IdentifyError.into()) } } - -fn write_wal_file( - conf: &PageServerConf, - startpos: Lsn, - timelineid: &ZTimelineId, - wal_seg_size: usize, - buf: &[u8], - tenantid: &ZTenantId, -) -> anyhow::Result<()> { - let mut bytes_left: usize = buf.len(); - let mut bytes_written: usize = 0; - let mut partial; - let mut start_pos = startpos; - const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; - - let wal_dir = conf.wal_dir_path(timelineid, tenantid); - - /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size); - - while bytes_left != 0 { - let bytes_to_write; - - /* - * If crossing a WAL boundary, only write up until we reach wal - * segment size. - */ - if xlogoff + bytes_left > wal_seg_size { - bytes_to_write = wal_seg_size - xlogoff; - } else { - bytes_to_write = bytes_left; - } - - /* Open file */ - let segno = start_pos.segment_number(wal_seg_size); - let wal_file_name = XLogFileName( - 1, // FIXME: always use Postgres timeline 1 - segno, - wal_seg_size, - ); - let wal_file_path = wal_dir.join(wal_file_name.clone()); - let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial"); - - { - let mut wal_file: File; - /* Try to open already completed segment */ - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { - wal_file = file; - partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) { - /* Try to open existed partial file */ - wal_file = file; - partial = true; - } else { - /* Create and fill new partial file */ - partial = true; - match OpenOptions::new() - .create(true) - .write(true) - .open(&wal_file_partial_path) - { - Ok(mut file) => { - for _ in 0..(wal_seg_size / XLOG_BLCKSZ) { - file.write_all(&ZERO_BLOCK)?; - } - wal_file = file; - } - Err(e) => { - error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); - } - } - } - wal_file.seek(SeekFrom::Start(xlogoff as u64))?; - wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - - // FIXME: Flush the file - //wal_file.sync_all()?; - } - /* Write was successful, advance our position */ - bytes_written += bytes_to_write; - bytes_left -= bytes_to_write; - start_pos += bytes_to_write as u64; - xlogoff += bytes_to_write; - - /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size) == 0 { - xlogoff = 0; - if partial { - fs::rename(&wal_file_partial_path, &wal_file_path)?; - } - } - } - Ok(()) -} diff --git a/vendor/postgres b/vendor/postgres index d125cc56b8..9932d259be 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit d125cc56b8f493a5d3886a4534e4b1a6db198a8f +Subproject commit 9932d259becfd367e12e26a38caa1537843f14e0 diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index aa3acf05fe..a8d07ee0a8 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -324,10 +324,7 @@ impl<'pg> ReceiveWalConn<'pg> { info!( "Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}", - server_info.timeline_id, - server_info.tenant_id, - self.peer_addr, - my_info.flush_lsn, + server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn, ); let mut last_rec_lsn = Lsn(0); let mut decoder = WalStreamDecoder::new(last_rec_lsn, false); @@ -408,7 +405,7 @@ impl<'pg> ReceiveWalConn<'pg> { * maximum (vcl) determined by WAL proposer during handshake. * Switching epoch means that node completes recovery and start writing in the WAL new data. */ - if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) { + if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) { info!("Switch to new epoch {}", prop.epoch); my_info.epoch = prop.epoch; /* bump epoch */ sync_control_file = true;