Fix bugs in wal_acceptor WAL parser

This commit is contained in:
Konstantin Knizhnik
2021-04-02 20:12:59 +03:00
parent 4566a0a160
commit 6eabe17e98
3 changed files with 65 additions and 43 deletions

View File

@@ -47,7 +47,7 @@ pub enum StartupRequestCode {
impl FeStartupMessage {
pub fn parse(buf: &mut BytesMut) -> Result<Option<FeMessage>> {
const MAX_STARTUP_PACKET_LENGTH: u32 = 10000;
const MAX_STARTUP_PACKET_LENGTH: usize = 10000;
const CANCEL_REQUEST_CODE: u32 = (1234 << 16) | 5678;
const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679;
const NEGOTIATE_GSS_CODE: u32 = (1234 << 16) | 5680;
@@ -55,14 +55,17 @@ impl FeStartupMessage {
if buf.len() < 4 {
return Ok(None);
}
let len = BigEndian::read_u32(&buf[0..4]);
let len = BigEndian::read_u32(&buf[0..4]) as usize;
if len < 4 || len as u32 > MAX_STARTUP_PACKET_LENGTH {
if len < 4 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid message length",
));
}
if buf.len() < len {
return Ok(None);
}
let version = BigEndian::read_u32(&buf[4..8]);

View File

@@ -457,7 +457,6 @@ impl WalAcceptor
panic!("Incompatible format version: {} vs. {}",
my_info.format_version, SK_FORMAT_VERSION);
}
let mut shared_state = self.mutex.lock().unwrap();
shared_state.info = my_info;
}
},
@@ -468,12 +467,12 @@ impl WalAcceptor
}
fn save_control_file(&self, sync : bool) -> Result<()> {
let mut buf = BytesMut::new();
let mut shared_state = self.mutex.lock().unwrap();
shared_state.info.pack(&mut buf);
let file = shared_state.control_file.as_mut().unwrap();
file.seek(SeekFrom::Start(0))?;
let mut buf = BytesMut::new();
let my_info = self.get_info();
my_info.pack(&mut buf);
file.write_all(&mut buf[..])?;
if sync {
file.sync_all()?;
@@ -487,6 +486,7 @@ impl WalAcceptor
match listener.accept().await {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true)?;
let mut conn = Connection::new(self, socket, &conf);
task::spawn(async move {
if let Err(err) = conn.run().await {
@@ -537,6 +537,7 @@ impl Connection {
let mut my_info = self.acceptor.get_info();
// Receive information about server
let server_info = self.read_req::<ServerInfo>().await?;
info!("Start handshake with wal_proposer {}", self.stream.peer_addr()?);
/* Check protocol compatibility */
if server_info.protocol_version != SK_PROTOCOL_VERSION {
@@ -586,9 +587,11 @@ impl Connection {
/* Acknowledge the proposed candidate by returning it to the proxy */
self.start_sending();
prop.pack(&mut self.outbuf);
prop.node_id.pack(&mut self.outbuf);
self.send().await?;
info!("Start streaming from server {} address {:?}", server_info.system_id, self.stream.peer_addr()?);
// Main loop
loop {
let mut sync_control_file = false;
@@ -644,6 +647,7 @@ impl Connection {
}
/* Report flush position */
//info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32);
let resp = SafeKeeperResponse {
epoch: my_info.epoch,
flush_lsn: end_pos,
@@ -710,6 +714,7 @@ impl Connection {
// Send WAL to replica or WAL sender using standard libpq replication protocol
//
async fn send_wal(&mut self) -> Result<()> {
info!("WAL sender to {:?} is started", self.stream.peer_addr()?);
loop {
self.start_sending();
match self.read_message().await? {
@@ -719,6 +724,7 @@ impl Connection {
match m.kind {
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
BeMessage::write(&mut self.outbuf, &BeMessage::Negotiate);
info!("SSL requested");
self.send().await?;
}
StartupRequestCode::Normal => {
@@ -731,7 +737,9 @@ impl Connection {
}
},
Some(FeMessage::Query(m)) => {
self.process_query(&m).await?;
if !self.process_query(&m).await? {
break;
}
},
Some(FeMessage::Terminate) => {
break;
@@ -745,13 +753,14 @@ impl Connection {
}
}
}
info!("WAL sender to {:?} is finished", self.stream.peer_addr()?);
Ok(())
}
//
// Handle IDENTIFY_SYSTEM replication command
//
async fn handle_identify_system(&mut self) -> Result<()> {
async fn handle_identify_system(&mut self) -> Result<bool> {
let (start_pos,timeline) = self.find_end_of_wal(false);
let lsn = format!("{:X}/{:>08X}", (start_pos>>32) as u32, start_pos as u32);
let tli = timeline.to_string();
@@ -776,13 +785,14 @@ impl Connection {
BeMessage::write(&mut self.outbuf, &BeMessage::DataRow(&[Some(lsn_bytes),Some(tli_bytes),Some(sysid_bytes),None]));
BeMessage::write(&mut self.outbuf, &BeMessage::CommandComplete(b"IDENTIFY_SYSTEM"));
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await
self.send().await?;
Ok(true)
}
//
// Handle START_REPLICATION replication command
//
async fn handle_start_replication(&mut self, cmd: &Bytes) -> Result<()> {
async fn handle_start_replication(&mut self, cmd: &Bytes) -> Result<bool> {
let re = Regex::new(r"([[:xdigit:]]*)/([[:xdigit:]]*)").unwrap();
let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap());
let cap = caps.next().unwrap();
@@ -800,6 +810,9 @@ impl Connection {
if start_pos == 0 {
start_pos = wal_end;
}
info!("Start replication from {:X}/{:>08X} till {:X}/{:>08X}",
(start_pos>>32) as u32, start_pos as u32,
(stop_pos>>32) as u32, stop_pos as u32);
BeMessage::write(&mut self.outbuf, &BeMessage::Copy);
self.send().await?;
@@ -811,13 +824,12 @@ impl Connection {
let mut end_pos : XLogRecPtr;
let mut commit_lsn : XLogRecPtr;
let mut wal_file : Option<File> = None;
let mut buf = [0u8; LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + MAX_SEND_SIZE];
self.outbuf.resize(LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + MAX_SEND_SIZE, 0u8);
loop {
/* Wait until we have some data to stream */
if stop_pos != 0 {
/* recovery mode: stream up to the specified LSN (VCL) */
if start_pos > stop_pos {
if start_pos >= stop_pos {
/* recovery finished */
break;
}
@@ -850,10 +862,9 @@ impl Connection {
self.acceptor.add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
_ => {}
}
}
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
},
Err(e) => {
return Err(e.into());
}
@@ -885,25 +896,25 @@ impl Connection {
let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size;
let data_start = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE;
let data_end = data_start + send_size;
file.read_exact(&mut buf[data_start..data_end])?;
buf[0] = b'd';
BigEndian::write_u32(&mut buf[1..5], (msg_size - LIBPQ_MSG_SIZE_OFFS) as u32);
buf[5] = b'w';
BigEndian::write_u64(&mut buf[6..14], start_pos);
BigEndian::write_u64(&mut buf[14..22], end_pos);
BigEndian::write_u64(&mut buf[22..30], get_current_timestamp());
file.read_exact(&mut self.outbuf[data_start..data_end])?;
self.outbuf[0] = b'd';
BigEndian::write_u32(&mut self.outbuf[1..5], (msg_size - LIBPQ_MSG_SIZE_OFFS) as u32);
self.outbuf[5] = b'w';
BigEndian::write_u64(&mut self.outbuf[6..14], start_pos);
BigEndian::write_u64(&mut self.outbuf[14..22], end_pos);
BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp());
self.stream.write_all(&buf[0..msg_size]).await?;
self.stream.write_all(&self.outbuf[0..msg_size]).await?;
start_pos += send_size as u64;
if XLogSegmentOffset(start_pos, wal_seg_size) != 0 {
wal_file = Some(file);
}
}
Ok(())
Ok(false)
}
async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> {
async fn process_query(&mut self, q : &FeQueryMessage) -> Result<bool> {
trace!("got query {:?}", q.body);
if q.body.starts_with(b"IDENTIFY_SYSTEM") {
@@ -957,7 +968,7 @@ impl Connection {
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new().create(true).write(true).open(&wal_file_path) {
match OpenOptions::new().create(true).write(true).open(&wal_file_partial_path) {
Ok(mut file) => {
for _ in 0..(wal_seg_size/XLOG_BLCKSZ) {
file.write_all(&ZERO_BLOCK)?;
@@ -973,7 +984,7 @@ impl Connection {
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// Fluh file is not prohibited
// Flush file is not prohibited
if !self.conf.no_sync {
wal_file.sync_all()?;
}

View File

@@ -5,14 +5,17 @@ use std::cmp::min;
use std::io::prelude::*;
use byteorder::{LittleEndian, ByteOrder};
use crc32c::*;
use log::*;
pub const XLOG_FNAME_LEN : usize = 24;
pub const XLOG_BLCKSZ : usize = 8192;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD : usize = 2+2+4+8+4 + 4;
pub const XLOG_SIZE_OF_XLOG_LONG_PHD : usize = (2+2+4+8+4) + 4 + 8 + 4 + 4;
pub const XLP_FIRST_IS_CONTRECORD : u16 = 0x0001;
pub const XLOG_PAGE_MAGIC : u16 = 0xD109;
pub const XLP_REM_LEN_OFFS : usize = 2+2+4+8;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD : usize = XLP_REM_LEN_OFFS + 4 + 4;
pub const XLOG_SIZE_OF_XLOG_LONG_PHD : usize = XLOG_SIZE_OF_XLOG_SHORT_PHD + 8 + 4 + 4;
pub const XLOG_RECORD_CRC_OFFS : usize = 4+4+8+1+1+2;
pub const XLOG_SIZE_OF_XLOG_RECORD : usize = XLOG_RECORD_CRC_OFFS+4;
pub type XLogRecPtr = u64;
pub type TimeLineID = u32;
pub type TimestampTz = u64;
@@ -80,7 +83,6 @@ pub fn get_current_timestamp() -> TimestampTz
fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID, wal_seg_size: usize) -> u32 {
let mut offs : usize = 0;
let mut padlen : usize = 0;
let mut contlen : usize = 0;
let mut wal_crc : u32 = 0;
let mut crc : u32 = 0;
@@ -90,6 +92,7 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID
let mut last_valid_rec_pos : usize = 0;
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
while offs < wal_seg_size {
if offs % XLOG_BLCKSZ == 0 {
if let Ok(bytes_read) = file.read(&mut buf) {
@@ -99,14 +102,21 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID
} else {
break;
}
let xlp_magic = LittleEndian::read_u16(&buf[0..2]);
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS+4]);
if xlp_magic != XLOG_PAGE_MAGIC {
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
break;
}
if offs == 0 {
offs = XLOG_SIZE_OF_XLOG_LONG_PHD;
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
offs += ((xlp_rem_len + 7) & !7) as usize;
}
} else {
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
}
} else if padlen > 0 {
offs += padlen;
padlen = 0;
} else if contlen == 0 {
let page_offs = offs % XLOG_BLCKSZ;
let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs+4]) as usize;
@@ -129,7 +139,7 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID
let len = min(XLOG_RECORD_CRC_OFFS-rec_offs, n);
rec_hdr[rec_offs..rec_offs+len].copy_from_slice(&buf[page_offs..page_offs+len]);
}
if rec_offs < XLOG_SIZE_OF_XLOG_RECORD && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs+4]);
crc = crc32c_append(0, &buf[crc_offs+4..page_offs+n]);
@@ -146,13 +156,11 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID
if contlen == 0 {
crc = !crc;
crc = crc32c_append(crc, &rec_hdr);
if offs % 8 != 0 {
padlen = 8 - (offs % 8);
}
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
if crc == wal_crc {
last_valid_rec_pos = offs + padlen;
last_valid_rec_pos = offs;
} else {
println!("CRC mismatch {} vs {}", crc, wal_crc);
info!("CRC mismatch {} vs {} at {}", crc, wal_crc, last_valid_rec_pos);
break;
}
}