From 6eabe17e9874185f7de159f664c0793610a1f350 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 2 Apr 2021 20:12:59 +0300 Subject: [PATCH] Fix bugs in wal_acceptor WAL parser --- src/pq_protocol.rs | 9 ++++--- src/wal_service.rs | 65 +++++++++++++++++++++++++++------------------- src/xlog_utils.rs | 34 ++++++++++++++---------- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/src/pq_protocol.rs b/src/pq_protocol.rs index cd0c5bca3d..ce7b282eb6 100644 --- a/src/pq_protocol.rs +++ b/src/pq_protocol.rs @@ -47,7 +47,7 @@ pub enum StartupRequestCode { impl FeStartupMessage { pub fn parse(buf: &mut BytesMut) -> Result> { - const MAX_STARTUP_PACKET_LENGTH: u32 = 10000; + const MAX_STARTUP_PACKET_LENGTH: usize = 10000; const CANCEL_REQUEST_CODE: u32 = (1234 << 16) | 5678; const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679; const NEGOTIATE_GSS_CODE: u32 = (1234 << 16) | 5680; @@ -55,14 +55,17 @@ impl FeStartupMessage { if buf.len() < 4 { return Ok(None); } - let len = BigEndian::read_u32(&buf[0..4]); + let len = BigEndian::read_u32(&buf[0..4]) as usize; - if len < 4 || len as u32 > MAX_STARTUP_PACKET_LENGTH { + if len < 4 || len > MAX_STARTUP_PACKET_LENGTH { return Err(io::Error::new( io::ErrorKind::InvalidData, "invalid message length", )); } + if buf.len() < len { + return Ok(None); + } let version = BigEndian::read_u32(&buf[4..8]); diff --git a/src/wal_service.rs b/src/wal_service.rs index c4751d8771..cea05f1e35 100644 --- a/src/wal_service.rs +++ b/src/wal_service.rs @@ -457,7 +457,6 @@ impl WalAcceptor panic!("Incompatible format version: {} vs. {}", my_info.format_version, SK_FORMAT_VERSION); } - let mut shared_state = self.mutex.lock().unwrap(); shared_state.info = my_info; } }, @@ -468,12 +467,12 @@ impl WalAcceptor } fn save_control_file(&self, sync : bool) -> Result<()> { + let mut buf = BytesMut::new(); let mut shared_state = self.mutex.lock().unwrap(); + shared_state.info.pack(&mut buf); + let file = shared_state.control_file.as_mut().unwrap(); file.seek(SeekFrom::Start(0))?; - let mut buf = BytesMut::new(); - let my_info = self.get_info(); - my_info.pack(&mut buf); file.write_all(&mut buf[..])?; if sync { file.sync_all()?; @@ -487,6 +486,7 @@ impl WalAcceptor match listener.accept().await { Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr); + socket.set_nodelay(true)?; let mut conn = Connection::new(self, socket, &conf); task::spawn(async move { if let Err(err) = conn.run().await { @@ -537,6 +537,7 @@ impl Connection { let mut my_info = self.acceptor.get_info(); // Receive information about server let server_info = self.read_req::().await?; + info!("Start handshake with wal_proposer {}", self.stream.peer_addr()?); /* Check protocol compatibility */ if server_info.protocol_version != SK_PROTOCOL_VERSION { @@ -586,9 +587,11 @@ impl Connection { /* Acknowledge the proposed candidate by returning it to the proxy */ self.start_sending(); - prop.pack(&mut self.outbuf); + prop.node_id.pack(&mut self.outbuf); self.send().await?; + info!("Start streaming from server {} address {:?}", server_info.system_id, self.stream.peer_addr()?); + // Main loop loop { let mut sync_control_file = false; @@ -644,6 +647,7 @@ impl Connection { } /* Report flush position */ + //info!("Confirm LSN: {:X}/{:>08X}", (end_pos>>32) as u32, end_pos as u32); let resp = SafeKeeperResponse { epoch: my_info.epoch, flush_lsn: end_pos, @@ -710,6 +714,7 @@ impl Connection { // Send WAL to replica or WAL sender using standard libpq replication protocol // async fn send_wal(&mut self) -> Result<()> { + info!("WAL sender to {:?} is started", self.stream.peer_addr()?); loop { self.start_sending(); match self.read_message().await? { @@ -719,6 +724,7 @@ impl Connection { match m.kind { StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { BeMessage::write(&mut self.outbuf, &BeMessage::Negotiate); + info!("SSL requested"); self.send().await?; } StartupRequestCode::Normal => { @@ -731,7 +737,9 @@ impl Connection { } }, Some(FeMessage::Query(m)) => { - self.process_query(&m).await?; + if !self.process_query(&m).await? { + break; + } }, Some(FeMessage::Terminate) => { break; @@ -745,13 +753,14 @@ impl Connection { } } } + info!("WAL sender to {:?} is finished", self.stream.peer_addr()?); Ok(()) } // // Handle IDENTIFY_SYSTEM replication command // - async fn handle_identify_system(&mut self) -> Result<()> { + async fn handle_identify_system(&mut self) -> Result { let (start_pos,timeline) = self.find_end_of_wal(false); let lsn = format!("{:X}/{:>08X}", (start_pos>>32) as u32, start_pos as u32); let tli = timeline.to_string(); @@ -776,13 +785,14 @@ impl Connection { BeMessage::write(&mut self.outbuf, &BeMessage::DataRow(&[Some(lsn_bytes),Some(tli_bytes),Some(sysid_bytes),None])); BeMessage::write(&mut self.outbuf, &BeMessage::CommandComplete(b"IDENTIFY_SYSTEM")); BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); - self.send().await + self.send().await?; + Ok(true) } // // Handle START_REPLICATION replication command // - async fn handle_start_replication(&mut self, cmd: &Bytes) -> Result<()> { + async fn handle_start_replication(&mut self, cmd: &Bytes) -> Result { let re = Regex::new(r"([[:xdigit:]]*)/([[:xdigit:]]*)").unwrap(); let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap()); let cap = caps.next().unwrap(); @@ -800,6 +810,9 @@ impl Connection { if start_pos == 0 { start_pos = wal_end; } + info!("Start replication from {:X}/{:>08X} till {:X}/{:>08X}", + (start_pos>>32) as u32, start_pos as u32, + (stop_pos>>32) as u32, stop_pos as u32); BeMessage::write(&mut self.outbuf, &BeMessage::Copy); self.send().await?; @@ -811,13 +824,12 @@ impl Connection { let mut end_pos : XLogRecPtr; let mut commit_lsn : XLogRecPtr; let mut wal_file : Option = None; - let mut buf = [0u8; LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + MAX_SEND_SIZE]; - + self.outbuf.resize(LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + MAX_SEND_SIZE, 0u8); loop { /* Wait until we have some data to stream */ if stop_pos != 0 { /* recovery mode: stream up to the specified LSN (VCL) */ - if start_pos > stop_pos { + if start_pos >= stop_pos { /* recovery finished */ break; } @@ -850,10 +862,9 @@ impl Connection { self.acceptor.add_hs_feedback(HotStandbyFeedback::parse(&m.body)), _ => {} } - } + }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } + }, Err(e) => { return Err(e.into()); } @@ -885,25 +896,25 @@ impl Connection { let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size; let data_start = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE; let data_end = data_start + send_size; - file.read_exact(&mut buf[data_start..data_end])?; - buf[0] = b'd'; - BigEndian::write_u32(&mut buf[1..5], (msg_size - LIBPQ_MSG_SIZE_OFFS) as u32); - buf[5] = b'w'; - BigEndian::write_u64(&mut buf[6..14], start_pos); - BigEndian::write_u64(&mut buf[14..22], end_pos); - BigEndian::write_u64(&mut buf[22..30], get_current_timestamp()); + file.read_exact(&mut self.outbuf[data_start..data_end])?; + self.outbuf[0] = b'd'; + BigEndian::write_u32(&mut self.outbuf[1..5], (msg_size - LIBPQ_MSG_SIZE_OFFS) as u32); + self.outbuf[5] = b'w'; + BigEndian::write_u64(&mut self.outbuf[6..14], start_pos); + BigEndian::write_u64(&mut self.outbuf[14..22], end_pos); + BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp()); - self.stream.write_all(&buf[0..msg_size]).await?; + self.stream.write_all(&self.outbuf[0..msg_size]).await?; start_pos += send_size as u64; if XLogSegmentOffset(start_pos, wal_seg_size) != 0 { wal_file = Some(file); } } - Ok(()) + Ok(false) } - async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> { + async fn process_query(&mut self, q : &FeQueryMessage) -> Result { trace!("got query {:?}", q.body); if q.body.starts_with(b"IDENTIFY_SYSTEM") { @@ -957,7 +968,7 @@ impl Connection { } else { /* Create and fill new partial file */ partial = true; - match OpenOptions::new().create(true).write(true).open(&wal_file_path) { + match OpenOptions::new().create(true).write(true).open(&wal_file_partial_path) { Ok(mut file) => { for _ in 0..(wal_seg_size/XLOG_BLCKSZ) { file.write_all(&ZERO_BLOCK)?; @@ -973,7 +984,7 @@ impl Connection { wal_file.seek(SeekFrom::Start(xlogoff as u64))?; wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?; - // Fluh file is not prohibited + // Flush file is not prohibited if !self.conf.no_sync { wal_file.sync_all()?; } diff --git a/src/xlog_utils.rs b/src/xlog_utils.rs index 0960e45a9b..b60a353913 100644 --- a/src/xlog_utils.rs +++ b/src/xlog_utils.rs @@ -5,14 +5,17 @@ use std::cmp::min; use std::io::prelude::*; use byteorder::{LittleEndian, ByteOrder}; use crc32c::*; +use log::*; pub const XLOG_FNAME_LEN : usize = 24; pub const XLOG_BLCKSZ : usize = 8192; -pub const XLOG_SIZE_OF_XLOG_SHORT_PHD : usize = 2+2+4+8+4 + 4; -pub const XLOG_SIZE_OF_XLOG_LONG_PHD : usize = (2+2+4+8+4) + 4 + 8 + 4 + 4; +pub const XLP_FIRST_IS_CONTRECORD : u16 = 0x0001; +pub const XLOG_PAGE_MAGIC : u16 = 0xD109; +pub const XLP_REM_LEN_OFFS : usize = 2+2+4+8; +pub const XLOG_SIZE_OF_XLOG_SHORT_PHD : usize = XLP_REM_LEN_OFFS + 4 + 4; +pub const XLOG_SIZE_OF_XLOG_LONG_PHD : usize = XLOG_SIZE_OF_XLOG_SHORT_PHD + 8 + 4 + 4; pub const XLOG_RECORD_CRC_OFFS : usize = 4+4+8+1+1+2; pub const XLOG_SIZE_OF_XLOG_RECORD : usize = XLOG_RECORD_CRC_OFFS+4; - pub type XLogRecPtr = u64; pub type TimeLineID = u32; pub type TimestampTz = u64; @@ -80,7 +83,6 @@ pub fn get_current_timestamp() -> TimestampTz fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID, wal_seg_size: usize) -> u32 { let mut offs : usize = 0; - let mut padlen : usize = 0; let mut contlen : usize = 0; let mut wal_crc : u32 = 0; let mut crc : u32 = 0; @@ -90,6 +92,7 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID let mut last_valid_rec_pos : usize = 0; let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap(); let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS]; + while offs < wal_seg_size { if offs % XLOG_BLCKSZ == 0 { if let Ok(bytes_read) = file.read(&mut buf) { @@ -99,14 +102,21 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID } else { break; } + let xlp_magic = LittleEndian::read_u16(&buf[0..2]); + let xlp_info = LittleEndian::read_u16(&buf[2..4]); + let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS+4]); + if xlp_magic != XLOG_PAGE_MAGIC { + info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic); + break; + } if offs == 0 { offs = XLOG_SIZE_OF_XLOG_LONG_PHD; + if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 { + offs += ((xlp_rem_len + 7) & !7) as usize; + } } else { offs += XLOG_SIZE_OF_XLOG_SHORT_PHD; } - } else if padlen > 0 { - offs += padlen; - padlen = 0; } else if contlen == 0 { let page_offs = offs % XLOG_BLCKSZ; let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs+4]) as usize; @@ -129,7 +139,7 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID let len = min(XLOG_RECORD_CRC_OFFS-rec_offs, n); rec_hdr[rec_offs..rec_offs+len].copy_from_slice(&buf[page_offs..page_offs+len]); } - if rec_offs < XLOG_SIZE_OF_XLOG_RECORD && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD { + if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD { let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS; wal_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs+4]); crc = crc32c_append(0, &buf[crc_offs+4..page_offs+n]); @@ -146,13 +156,11 @@ fn find_end_of_wal_segment(data_dir: &PathBuf, segno: XLogSegNo, tli: TimeLineID if contlen == 0 { crc = !crc; crc = crc32c_append(crc, &rec_hdr); - if offs % 8 != 0 { - padlen = 8 - (offs % 8); - } + offs = (offs + 7) & !7; // pad on 8 bytes boundary */ if crc == wal_crc { - last_valid_rec_pos = offs + padlen; + last_valid_rec_pos = offs; } else { - println!("CRC mismatch {} vs {}", crc, wal_crc); + info!("CRC mismatch {} vs {} at {}", crc, wal_crc, last_valid_rec_pos); break; } }