Disable write WAL to files at pageserver

This commit is contained in:
Konstantin Knizhnik
2021-08-31 11:13:55 +03:00
parent 73d823e53c
commit 26060dd68e
3 changed files with 6 additions and 113 deletions

View File

@@ -22,8 +22,6 @@ use postgres_types::PgLsn;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
@@ -194,15 +192,6 @@ fn walreceiver_main(
let endlsn = startlsn + data.len() as u64;
let prev_last_rec_lsn = last_rec_lsn;
write_wal_file(
conf,
startlsn,
&timelineid,
pg_constants::WAL_SEGMENT_SIZE,
data,
tenantid,
)?;
trace!("received XLogData between {} and {}", startlsn, endlsn);
waldecoder.feed_bytes(data);
@@ -236,7 +225,9 @@ fn walreceiver_main(
Ok(None) => {
trace!(
"End of replication stream {}..{} at {}",
startlsn, endlsn, last_rec_lsn
startlsn,
endlsn,
last_rec_lsn
);
break;
}
@@ -420,98 +411,3 @@ pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
Err(IdentifyError.into())
}
}
fn write_wal_file(
conf: &PageServerConf,
startpos: Lsn,
timelineid: &ZTimelineId,
wal_seg_size: usize,
buf: &[u8],
tenantid: &ZTenantId,
) -> anyhow::Result<()> {
let mut bytes_left: usize = buf.len();
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_dir = conf.wal_dir_path(timelineid, tenantid);
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size);
while bytes_left != 0 {
let bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach wal
* segment size.
*/
if xlogoff + bytes_left > wal_seg_size {
bytes_to_write = wal_seg_size - xlogoff;
} else {
bytes_to_write = bytes_left;
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
wal_seg_size,
);
let wal_file_path = wal_dir.join(wal_file_name.clone());
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
/* Try to open existed partial file */
wal_file = file;
partial = true;
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(&ZERO_BLOCK)?;
}
wal_file = file;
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
}
}
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// FIXME: Flush the file
//wal_file.sync_all()?;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
start_pos += bytes_to_write as u64;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
}
}
}
Ok(())
}

View File

@@ -324,10 +324,7 @@ impl<'pg> ReceiveWalConn<'pg> {
info!(
"Start accepting WAL for timeline {} tenant {} address {:?} flush_lsn={}",
server_info.timeline_id,
server_info.tenant_id,
self.peer_addr,
my_info.flush_lsn,
server_info.timeline_id, server_info.tenant_id, self.peer_addr, my_info.flush_lsn,
);
let mut last_rec_lsn = Lsn(0);
let mut decoder = WalStreamDecoder::new(last_rec_lsn, false);
@@ -408,7 +405,7 @@ impl<'pg> ReceiveWalConn<'pg> {
* maximum (vcl) determined by WAL proposer during handshake.
* Switching epoch means that node completes recovery and start writing in the WAL new data.
*/
if my_info.epoch < prop.epoch && end_pos > max(my_info.flush_lsn, prop.vcl) {
if my_info.epoch < prop.epoch && end_pos >= max(my_info.flush_lsn, prop.vcl) {
info!("Switch to new epoch {}", prop.epoch);
my_info.epoch = prop.epoch; /* bump epoch */
sync_control_file = true;