From 6c825dcbaa80f77b0a2b799747d576edae290599 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sun, 2 May 2021 17:35:08 -0700 Subject: [PATCH] switch walkeeper over to new postgres blocking interface This is a big async -> sync conversion. Most of it is a pretty straightforward conversion of removing `async` and `.await` and swapping in the right std modules. I didn't find a thread-blocking version of `Notify` so I wrote one, and then realized that there was already a Mutex being used there, so I deleted my Notify and just used Condvar instead. There is one part that seems odd to me: in `handle_start_replication` there is a place where the previous code was doing a non-blocking read; there is no TcpStream::try_read() so I fell back on manually flipping the socket to non-blocking mode and then back again. This seems pretty gross, but I'm not sure exactly what to replace this with: a background thread? Extract the fd and run select() on it to first test if it's readable? --- walkeeper/src/wal_service.rs | 196 ++++++++++++++++++----------------- 1 file changed, 102 insertions(+), 94 deletions(-) diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 7e64a91eb5..c270ca5efc 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -8,25 +8,17 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use fs2::FileExt; use lazy_static::lazy_static; use log::*; +use postgres::{Client, NoTls}; use regex::Regex; -use std::cmp::max; -use std::cmp::min; +use std::cmp::{max, min}; use std::collections::HashMap; -use std::fs; -use std::fs::File; -use std::fs::OpenOptions; -use std::io; -use std::io::prelude::*; -use std::io::SeekFrom; +use std::fs::{self, File, OpenOptions}; +use std::io::{self, prelude::*, SeekFrom}; use std::mem; +use std::net::{TcpListener, TcpStream}; use std::str; -use std::sync::{Arc, Mutex}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::runtime; -use tokio::sync::Notify; -use tokio::task; -use tokio_postgres::{connect, Error, NoTls}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; use crate::pq_protocol::*; use crate::WalAcceptorConf; @@ -47,6 +39,39 @@ const LIBPQ_MSG_SIZE_OFFS: usize = 1; const CONTROL_FILE_NAME: &str = "safekeeper.control"; const END_OF_STREAM: XLogRecPtr = 0; +/// Read some bytes from a type that implements [`Read`] into a [`BytesMut`] +/// +/// Will return the number of bytes read, just like `Read::read()` would. +/// +fn read_into(r: &mut impl Read, buf: &mut BytesMut) -> io::Result { + // This is a workaround, because BytesMut and std::io don't play + // well together. + // + // I think this code needs to go away, and I'm confident that + // that's possible, if we are willing to refactor this code to + // use std::io::BufReader instead of doing buffer management + // ourselves. + // + // SAFETY: we already have exclusive access to self.inbuf, so + // there are no concurrency problems; the only risk would be + // accidentally exposing uninitialized parts of the buffer. + // + // We write into the buffer just past the known-initialized part, + // then manually increment its length by the exact number of + // bytes we read. So no uninitialized memory should be exposed. + + let start = buf.len(); + let end = buf.capacity(); + + let num_bytes = unsafe { + let fill_here = buf.get_unchecked_mut(start..end); + let num_bytes_read = r.read(fill_here)?; + buf.set_len(start + num_bytes_read); + num_bytes_read + }; + Ok(num_bytes) +} + /* * Unique node identifier used by Paxos */ @@ -142,14 +167,13 @@ struct SharedState { hs_feedback: HotStandbyFeedback, /* combined hot standby feedback from all replicas */ } -/* - * Database instance (tenant) - */ +/// Database instance (tenant) #[derive(Debug)] pub struct Timeline { timelineid: ZTimelineId, mutex: Mutex, - cond: Notify, /* conditional variable used to notify wal senders */ + /// conditional variable used to notify wal senders + cond: Condvar, } /* @@ -358,33 +382,20 @@ lazy_static! { } pub fn thread_main(conf: WalAcceptorConf) { - // Create a new thread pool - // - // FIXME: keep it single-threaded for now, make it easier to debug with gdb, - // and we're not concerned with performance yet. - //let runtime = runtime::Runtime::new().unwrap(); - let runtime = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - info!("Starting wal acceptor on {}", conf.listen_addr); - - runtime.block_on(async { - main_loop(&conf).await.unwrap(); - }); + main_loop(&conf).unwrap(); } -async fn main_loop(conf: &WalAcceptorConf) -> Result<()> { - let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?; +fn main_loop(conf: &WalAcceptorConf) -> Result<()> { + let listener = TcpListener::bind(conf.listen_addr)?; loop { - match listener.accept().await { + match listener.accept() { Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr); socket.set_nodelay(true)?; let mut conn = Connection::new(socket, &conf); - task::spawn(async move { - if let Err(err) = conn.run().await { + thread::spawn(move || { + if let Err(err) = conn.run() { error!("error: {}", err); } }); @@ -409,7 +420,7 @@ impl Timeline { Timeline { timelineid, mutex: Mutex::new(shared_state), - cond: Notify::new(), + cond: Condvar::new(), } } @@ -418,7 +429,7 @@ impl Timeline { let mut shared_state = self.mutex.lock().unwrap(); if shared_state.commit_lsn < commit_lsn { shared_state.commit_lsn = commit_lsn; - self.cond.notify_waiters(); + self.cond.notify_all(); } } @@ -553,26 +564,26 @@ impl Connection { self.timeline.as_ref().unwrap().clone() } - async fn run(&mut self) -> Result<()> { + fn run(&mut self) -> Result<()> { self.inbuf.resize(4, 0u8); - self.stream.read_exact(&mut self.inbuf[0..4]).await?; + self.stream.read_exact(&mut self.inbuf[0..4])?; let startup_pkg_len = BigEndian::read_u32(&self.inbuf[0..4]); if startup_pkg_len == 0 { - self.receive_wal().await?; // internal protocol between wal_proposer and wal_acceptor + self.receive_wal()?; // internal protocol between wal_proposer and wal_acceptor } else { - self.send_wal().await?; // libpq replication protocol between wal_acceptor and replicas/pagers + self.send_wal()?; // libpq replication protocol between wal_acceptor and replicas/pagers } Ok(()) } - async fn read_req(&mut self) -> Result { + fn read_req(&mut self) -> Result { let size = mem::size_of::(); self.inbuf.resize(size, 0u8); - self.stream.read_exact(&mut self.inbuf[0..size]).await?; + self.stream.read_exact(&mut self.inbuf[0..size])?; Ok(T::unpack(&mut self.inbuf)) } - async fn request_callback(&self) -> std::result::Result<(), Error> { + fn request_callback(&self) -> std::result::Result<(), postgres::error::Error> { if let Some(addr) = self.conf.pageserver_addr { let ps_connstr = format!( "host={} port={} dbname={} user={}", @@ -592,16 +603,8 @@ impl Connection { "requesting page server to connect to us: start {} {}", ps_connstr, callme ); - let (client, connection) = connect(&ps_connstr, NoTls).await?; - - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - tokio::spawn(async move { - if let Err(e) = connection.await { - error!("pageserver connection error: {}", e); - } - }); - client.simple_query(&callme).await?; + let mut client = Client::connect(&ps_connstr, NoTls)?; + client.simple_query(&callme)?; } Ok(()) } @@ -618,9 +621,9 @@ impl Connection { } // Receive WAL from wal_proposer - async fn receive_wal(&mut self) -> Result<()> { + fn receive_wal(&mut self) -> Result<()> { // Receive information about server - let server_info = self.read_req::().await?; + let server_info = self.read_req::()?; info!( "Start handshake with wal_proposer {} sysid {} timeline {}", self.stream.peer_addr()?, @@ -663,16 +666,16 @@ impl Connection { /* Report my identifier to proxy */ self.start_sending(); my_info.pack(&mut self.outbuf); - self.send().await?; + self.send()?; /* Wait for vote request */ - let prop = self.read_req::().await?; + let prop = self.read_req::()?; /* This is Paxos check which should ensure that only one master can perform commits */ if prop.node_id < my_info.server.node_id { /* Send my node-id to inform proxy that it's candidate was rejected */ self.start_sending(); my_info.server.node_id.pack(&mut self.outbuf); - self.send().await?; + self.send()?; io_error!( "Reject connection attempt with term {} because my term is {}", prop.node_id.term, @@ -690,11 +693,11 @@ impl Connection { /* Acknowledge the proposed candidate by returning it to the proxy */ self.start_sending(); prop.node_id.pack(&mut self.outbuf); - self.send().await?; + self.send()?; // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver, we should use callme mechanism - if let Err(e) = self.request_callback().await { + if let Err(e) = self.request_callback() { // Do not treate it as fatal error and continue work // FIXME: we should retry after a while... error!("Failed to send callme request to pageserver: {}", e); @@ -711,7 +714,7 @@ impl Connection { let mut sync_control_file = false; /* Receive message header */ - let req = self.read_req::().await?; + let req = self.read_req::()?; if req.sender_id != my_info.server.node_id { io_error!("Sender NodeId is changed"); } @@ -735,7 +738,7 @@ impl Connection { /* Receive message body */ self.inbuf.resize(rec_size, 0u8); - self.stream.read_exact(&mut self.inbuf[0..rec_size]).await?; + self.stream.read_exact(&mut self.inbuf[0..rec_size])?; /* Save message in file */ self.write_wal_file(start_pos, timeline, wal_seg_size, &self.inbuf[0..rec_size])?; @@ -778,7 +781,7 @@ impl Connection { }; self.start_sending(); resp.pack(&mut self.outbuf); - self.send().await?; + self.send()?; /* * Ping wal sender that new data is available. @@ -793,13 +796,13 @@ impl Connection { // // Read full message or return None if connection is closed // - async fn read_message(&mut self) -> Result> { + fn read_message(&mut self) -> Result> { loop { if let Some(message) = self.parse_message()? { return Ok(Some(message)); } - if self.stream.read_buf(&mut self.inbuf).await? == 0 { + if read_into(&mut self.stream, &mut self.inbuf)? == 0 { if self.inbuf.is_empty() { return Ok(None); } else { @@ -830,18 +833,18 @@ impl Connection { // // Send buffered messages // - async fn send(&mut self) -> Result<()> { - self.stream.write_all(&self.outbuf).await + fn send(&mut self) -> Result<()> { + self.stream.write_all(&self.outbuf) } // // Send WAL to replica or WAL receiver using standard libpq replication protocol // - async fn send_wal(&mut self) -> Result<()> { + fn send_wal(&mut self) -> Result<()> { info!("WAL sender to {:?} is started", self.stream.peer_addr()?); loop { self.start_sending(); - match self.read_message().await? { + match self.read_message()? { Some(FeMessage::StartupMessage(m)) => { trace!("got message {:?}", m); @@ -849,12 +852,12 @@ impl Connection { StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => { BeMessage::write(&mut self.outbuf, &BeMessage::Negotiate); info!("SSL requested"); - self.send().await?; + self.send()?; } StartupRequestCode::Normal => { BeMessage::write(&mut self.outbuf, &BeMessage::AuthenticationOk); BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); - self.send().await?; + self.send()?; self.init_done = true; self.set_timeline(m.timelineid)?; self.appname = m.appname; @@ -863,7 +866,7 @@ impl Connection { } } Some(FeMessage::Query(m)) => { - if !self.process_query(&m).await? { + if !self.process_query(&m)? { break; } } @@ -886,7 +889,7 @@ impl Connection { // // Handle IDENTIFY_SYSTEM replication command // - async fn handle_identify_system(&mut self) -> Result { + fn handle_identify_system(&mut self) -> Result { let (start_pos, timeline) = self.find_end_of_wal(false); let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32); let tli = timeline.to_string(); @@ -929,14 +932,14 @@ impl Connection { &BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"), ); BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); - self.send().await?; + self.send()?; Ok(true) } // // Handle START_REPLICATION replication command // - async fn handle_start_replication(&mut self, cmd: &Bytes) -> Result { + fn handle_start_replication(&mut self, cmd: &Bytes) -> Result { let re = Regex::new(r"([[:xdigit:]]*)/([[:xdigit:]]*)").unwrap(); let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap()); let cap = caps.next().unwrap(); @@ -965,7 +968,7 @@ impl Connection { stop_pos as u32 ); BeMessage::write(&mut self.outbuf, &BeMessage::Copy); - self.send().await?; + self.send()?; let mut end_pos: XLogRecPtr; let mut commit_lsn: XLogRecPtr; @@ -984,24 +987,29 @@ impl Connection { } else { /* normal mode */ let timeline = self.timeline(); + let mut shared_state = timeline.mutex.lock().unwrap(); loop { - // Rust doesn't allow to grab async result from mutex scope - { - let shared_state = timeline.mutex.lock().unwrap(); - commit_lsn = shared_state.commit_lsn; - if start_pos < commit_lsn { - end_pos = commit_lsn; - break; - } + commit_lsn = shared_state.commit_lsn; + if start_pos < commit_lsn { + end_pos = commit_lsn; + break; } - timeline.cond.notified().await; + shared_state = timeline.cond.wait(shared_state).unwrap(); } } if end_pos == END_REPLICATION_MARKER { break; } // Try to fetch replica's feedback - match self.stream.try_read_buf(&mut self.inbuf) { + + // Temporarily set this stream into nonblocking mode. + // FIXME: This seems like a dirty hack. + // Should this task be done on a background thread? + self.stream.set_nonblocking(true).unwrap(); + let read_result = self.stream.read(&mut self.inbuf); + self.stream.set_nonblocking(false).unwrap(); + + match read_result { Ok(0) => break, Ok(_) => { if let Some(FeMessage::CopyData(m)) = self.parse_message()? { @@ -1069,7 +1077,7 @@ impl Connection { BigEndian::write_u64(&mut self.outbuf[14..22], end_pos); BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp()); - self.stream.write_all(&self.outbuf[0..msg_size]).await?; + self.stream.write_all(&self.outbuf[0..msg_size])?; start_pos += send_size as u64; debug!( @@ -1085,13 +1093,13 @@ impl Connection { Ok(false) } - async fn process_query(&mut self, q: &FeQueryMessage) -> Result { + fn process_query(&mut self, q: &FeQueryMessage) -> Result { trace!("got query {:?}", q.body); if q.body.starts_with(b"IDENTIFY_SYSTEM") { - self.handle_identify_system().await + self.handle_identify_system() } else if q.body.starts_with(b"START_REPLICATION") { - self.handle_start_replication(&q.body).await + self.handle_start_replication(&q.body) } else { io_error!("Unexpected command {:?}", q.body); }