switch walkeeper over to new postgres blocking interface

This is a big async -> sync conversion. Most of it is a pretty
straightforward conversion of removing `async` and `.await` and swapping
in the right std modules.

I didn't find a thread-blocking version of `Notify` so I wrote one, and
then realized that there was already a Mutex being used there, so I
deleted my Notify and just used Condvar instead.

There is one part that seems odd to me: in `handle_start_replication`
there is a place where the previous code was doing a non-blocking read;
there is no TcpStream::try_read() so I fell back on manually flipping
the socket to non-blocking mode and then back again. This seems pretty
gross, but I'm not sure exactly what to replace this with: a background
thread? Extract the fd and run select() on it to first test if it's
readable?
This commit is contained in:
Eric Seppanen
2021-05-02 17:35:08 -07:00
parent 4b46693c81
commit 6c825dcbaa

View File

@@ -8,25 +8,17 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use fs2::FileExt;
use lazy_static::lazy_static;
use log::*;
use postgres::{Client, NoTls};
use regex::Regex;
use std::cmp::max;
use std::cmp::min;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs;
use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::prelude::*;
use std::io::SeekFrom;
use std::fs::{self, File, OpenOptions};
use std::io::{self, prelude::*, SeekFrom};
use std::mem;
use std::net::{TcpListener, TcpStream};
use std::str;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
use tokio::sync::Notify;
use tokio::task;
use tokio_postgres::{connect, Error, NoTls};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use crate::pq_protocol::*;
use crate::WalAcceptorConf;
@@ -47,6 +39,39 @@ const LIBPQ_MSG_SIZE_OFFS: usize = 1;
const CONTROL_FILE_NAME: &str = "safekeeper.control";
const END_OF_STREAM: XLogRecPtr = 0;
/// Read some bytes from a type that implements [`Read`] into a [`BytesMut`]
///
/// Will return the number of bytes read, just like `Read::read()` would.
///
fn read_into(r: &mut impl Read, buf: &mut BytesMut) -> io::Result<usize> {
// This is a workaround, because BytesMut and std::io don't play
// well together.
//
// I think this code needs to go away, and I'm confident that
// that's possible, if we are willing to refactor this code to
// use std::io::BufReader instead of doing buffer management
// ourselves.
//
// SAFETY: we already have exclusive access to self.inbuf, so
// there are no concurrency problems; the only risk would be
// accidentally exposing uninitialized parts of the buffer.
//
// We write into the buffer just past the known-initialized part,
// then manually increment its length by the exact number of
// bytes we read. So no uninitialized memory should be exposed.
let start = buf.len();
let end = buf.capacity();
let num_bytes = unsafe {
let fill_here = buf.get_unchecked_mut(start..end);
let num_bytes_read = r.read(fill_here)?;
buf.set_len(start + num_bytes_read);
num_bytes_read
};
Ok(num_bytes)
}
/*
* Unique node identifier used by Paxos
*/
@@ -142,14 +167,13 @@ struct SharedState {
hs_feedback: HotStandbyFeedback, /* combined hot standby feedback from all replicas */
}
/*
* Database instance (tenant)
*/
/// Database instance (tenant)
#[derive(Debug)]
pub struct Timeline {
timelineid: ZTimelineId,
mutex: Mutex<SharedState>,
cond: Notify, /* conditional variable used to notify wal senders */
/// conditional variable used to notify wal senders
cond: Condvar,
}
/*
@@ -358,33 +382,20 @@ lazy_static! {
}
pub fn thread_main(conf: WalAcceptorConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
// and we're not concerned with performance yet.
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
info!("Starting wal acceptor on {}", conf.listen_addr);
runtime.block_on(async {
main_loop(&conf).await.unwrap();
});
main_loop(&conf).unwrap();
}
async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
let listener = TcpListener::bind(conf.listen_addr.to_string().as_str()).await?;
fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
let listener = TcpListener::bind(conf.listen_addr)?;
loop {
match listener.accept().await {
match listener.accept() {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true)?;
let mut conn = Connection::new(socket, &conf);
task::spawn(async move {
if let Err(err) = conn.run().await {
thread::spawn(move || {
if let Err(err) = conn.run() {
error!("error: {}", err);
}
});
@@ -409,7 +420,7 @@ impl Timeline {
Timeline {
timelineid,
mutex: Mutex::new(shared_state),
cond: Notify::new(),
cond: Condvar::new(),
}
}
@@ -418,7 +429,7 @@ impl Timeline {
let mut shared_state = self.mutex.lock().unwrap();
if shared_state.commit_lsn < commit_lsn {
shared_state.commit_lsn = commit_lsn;
self.cond.notify_waiters();
self.cond.notify_all();
}
}
@@ -553,26 +564,26 @@ impl Connection {
self.timeline.as_ref().unwrap().clone()
}
async fn run(&mut self) -> Result<()> {
fn run(&mut self) -> Result<()> {
self.inbuf.resize(4, 0u8);
self.stream.read_exact(&mut self.inbuf[0..4]).await?;
self.stream.read_exact(&mut self.inbuf[0..4])?;
let startup_pkg_len = BigEndian::read_u32(&self.inbuf[0..4]);
if startup_pkg_len == 0 {
self.receive_wal().await?; // internal protocol between wal_proposer and wal_acceptor
self.receive_wal()?; // internal protocol between wal_proposer and wal_acceptor
} else {
self.send_wal().await?; // libpq replication protocol between wal_acceptor and replicas/pagers
self.send_wal()?; // libpq replication protocol between wal_acceptor and replicas/pagers
}
Ok(())
}
async fn read_req<T: Serializer>(&mut self) -> Result<T> {
fn read_req<T: Serializer>(&mut self) -> Result<T> {
let size = mem::size_of::<T>();
self.inbuf.resize(size, 0u8);
self.stream.read_exact(&mut self.inbuf[0..size]).await?;
self.stream.read_exact(&mut self.inbuf[0..size])?;
Ok(T::unpack(&mut self.inbuf))
}
async fn request_callback(&self) -> std::result::Result<(), Error> {
fn request_callback(&self) -> std::result::Result<(), postgres::error::Error> {
if let Some(addr) = self.conf.pageserver_addr {
let ps_connstr = format!(
"host={} port={} dbname={} user={}",
@@ -592,16 +603,8 @@ impl Connection {
"requesting page server to connect to us: start {} {}",
ps_connstr, callme
);
let (client, connection) = connect(&ps_connstr, NoTls).await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("pageserver connection error: {}", e);
}
});
client.simple_query(&callme).await?;
let mut client = Client::connect(&ps_connstr, NoTls)?;
client.simple_query(&callme)?;
}
Ok(())
}
@@ -618,9 +621,9 @@ impl Connection {
}
// Receive WAL from wal_proposer
async fn receive_wal(&mut self) -> Result<()> {
fn receive_wal(&mut self) -> Result<()> {
// Receive information about server
let server_info = self.read_req::<ServerInfo>().await?;
let server_info = self.read_req::<ServerInfo>()?;
info!(
"Start handshake with wal_proposer {} sysid {} timeline {}",
self.stream.peer_addr()?,
@@ -663,16 +666,16 @@ impl Connection {
/* Report my identifier to proxy */
self.start_sending();
my_info.pack(&mut self.outbuf);
self.send().await?;
self.send()?;
/* Wait for vote request */
let prop = self.read_req::<RequestVote>().await?;
let prop = self.read_req::<RequestVote>()?;
/* This is Paxos check which should ensure that only one master can perform commits */
if prop.node_id < my_info.server.node_id {
/* Send my node-id to inform proxy that it's candidate was rejected */
self.start_sending();
my_info.server.node_id.pack(&mut self.outbuf);
self.send().await?;
self.send()?;
io_error!(
"Reject connection attempt with term {} because my term is {}",
prop.node_id.term,
@@ -690,11 +693,11 @@ impl Connection {
/* Acknowledge the proposed candidate by returning it to the proxy */
self.start_sending();
prop.node_id.pack(&mut self.outbuf);
self.send().await?;
self.send()?;
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
if let Err(e) = self.request_callback().await {
if let Err(e) = self.request_callback() {
// Do not treate it as fatal error and continue work
// FIXME: we should retry after a while...
error!("Failed to send callme request to pageserver: {}", e);
@@ -711,7 +714,7 @@ impl Connection {
let mut sync_control_file = false;
/* Receive message header */
let req = self.read_req::<SafeKeeperRequest>().await?;
let req = self.read_req::<SafeKeeperRequest>()?;
if req.sender_id != my_info.server.node_id {
io_error!("Sender NodeId is changed");
}
@@ -735,7 +738,7 @@ impl Connection {
/* Receive message body */
self.inbuf.resize(rec_size, 0u8);
self.stream.read_exact(&mut self.inbuf[0..rec_size]).await?;
self.stream.read_exact(&mut self.inbuf[0..rec_size])?;
/* Save message in file */
self.write_wal_file(start_pos, timeline, wal_seg_size, &self.inbuf[0..rec_size])?;
@@ -778,7 +781,7 @@ impl Connection {
};
self.start_sending();
resp.pack(&mut self.outbuf);
self.send().await?;
self.send()?;
/*
* Ping wal sender that new data is available.
@@ -793,13 +796,13 @@ impl Connection {
//
// Read full message or return None if connection is closed
//
async fn read_message(&mut self) -> Result<Option<FeMessage>> {
fn read_message(&mut self) -> Result<Option<FeMessage>> {
loop {
if let Some(message) = self.parse_message()? {
return Ok(Some(message));
}
if self.stream.read_buf(&mut self.inbuf).await? == 0 {
if read_into(&mut self.stream, &mut self.inbuf)? == 0 {
if self.inbuf.is_empty() {
return Ok(None);
} else {
@@ -830,18 +833,18 @@ impl Connection {
//
// Send buffered messages
//
async fn send(&mut self) -> Result<()> {
self.stream.write_all(&self.outbuf).await
fn send(&mut self) -> Result<()> {
self.stream.write_all(&self.outbuf)
}
//
// Send WAL to replica or WAL receiver using standard libpq replication protocol
//
async fn send_wal(&mut self) -> Result<()> {
fn send_wal(&mut self) -> Result<()> {
info!("WAL sender to {:?} is started", self.stream.peer_addr()?);
loop {
self.start_sending();
match self.read_message().await? {
match self.read_message()? {
Some(FeMessage::StartupMessage(m)) => {
trace!("got message {:?}", m);
@@ -849,12 +852,12 @@ impl Connection {
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
BeMessage::write(&mut self.outbuf, &BeMessage::Negotiate);
info!("SSL requested");
self.send().await?;
self.send()?;
}
StartupRequestCode::Normal => {
BeMessage::write(&mut self.outbuf, &BeMessage::AuthenticationOk);
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await?;
self.send()?;
self.init_done = true;
self.set_timeline(m.timelineid)?;
self.appname = m.appname;
@@ -863,7 +866,7 @@ impl Connection {
}
}
Some(FeMessage::Query(m)) => {
if !self.process_query(&m).await? {
if !self.process_query(&m)? {
break;
}
}
@@ -886,7 +889,7 @@ impl Connection {
//
// Handle IDENTIFY_SYSTEM replication command
//
async fn handle_identify_system(&mut self) -> Result<bool> {
fn handle_identify_system(&mut self) -> Result<bool> {
let (start_pos, timeline) = self.find_end_of_wal(false);
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
let tli = timeline.to_string();
@@ -929,14 +932,14 @@ impl Connection {
&BeMessage::CommandComplete(b"IDENTIFY_SYSTEM\0"),
);
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await?;
self.send()?;
Ok(true)
}
//
// Handle START_REPLICATION replication command
//
async fn handle_start_replication(&mut self, cmd: &Bytes) -> Result<bool> {
fn handle_start_replication(&mut self, cmd: &Bytes) -> Result<bool> {
let re = Regex::new(r"([[:xdigit:]]*)/([[:xdigit:]]*)").unwrap();
let mut caps = re.captures_iter(str::from_utf8(&cmd[..]).unwrap());
let cap = caps.next().unwrap();
@@ -965,7 +968,7 @@ impl Connection {
stop_pos as u32
);
BeMessage::write(&mut self.outbuf, &BeMessage::Copy);
self.send().await?;
self.send()?;
let mut end_pos: XLogRecPtr;
let mut commit_lsn: XLogRecPtr;
@@ -984,24 +987,29 @@ impl Connection {
} else {
/* normal mode */
let timeline = self.timeline();
let mut shared_state = timeline.mutex.lock().unwrap();
loop {
// Rust doesn't allow to grab async result from mutex scope
{
let shared_state = timeline.mutex.lock().unwrap();
commit_lsn = shared_state.commit_lsn;
if start_pos < commit_lsn {
end_pos = commit_lsn;
break;
}
commit_lsn = shared_state.commit_lsn;
if start_pos < commit_lsn {
end_pos = commit_lsn;
break;
}
timeline.cond.notified().await;
shared_state = timeline.cond.wait(shared_state).unwrap();
}
}
if end_pos == END_REPLICATION_MARKER {
break;
}
// Try to fetch replica's feedback
match self.stream.try_read_buf(&mut self.inbuf) {
// Temporarily set this stream into nonblocking mode.
// FIXME: This seems like a dirty hack.
// Should this task be done on a background thread?
self.stream.set_nonblocking(true).unwrap();
let read_result = self.stream.read(&mut self.inbuf);
self.stream.set_nonblocking(false).unwrap();
match read_result {
Ok(0) => break,
Ok(_) => {
if let Some(FeMessage::CopyData(m)) = self.parse_message()? {
@@ -1069,7 +1077,7 @@ impl Connection {
BigEndian::write_u64(&mut self.outbuf[14..22], end_pos);
BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp());
self.stream.write_all(&self.outbuf[0..msg_size]).await?;
self.stream.write_all(&self.outbuf[0..msg_size])?;
start_pos += send_size as u64;
debug!(
@@ -1085,13 +1093,13 @@ impl Connection {
Ok(false)
}
async fn process_query(&mut self, q: &FeQueryMessage) -> Result<bool> {
fn process_query(&mut self, q: &FeQueryMessage) -> Result<bool> {
trace!("got query {:?}", q.body);
if q.body.starts_with(b"IDENTIFY_SYSTEM") {
self.handle_identify_system().await
self.handle_identify_system()
} else if q.body.starts_with(b"START_REPLICATION") {
self.handle_start_replication(&q.body).await
self.handle_start_replication(&q.body)
} else {
io_error!("Unexpected command {:?}", q.body);
}