diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 87faa57bfc..14e34ac8fc 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -16,7 +16,7 @@ use std::fs::File; use std::io::{BufReader, Read, Seek, SeekFrom, Write}; use std::net::TcpStream; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::{str, thread}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; @@ -45,7 +45,7 @@ pub struct ReplicationConn { /// `take` it from us. stream_in: Option>, /// Postgres connection, output - stream_out: Mutex, + stream_out: TcpStream, /// wal acceptor configuration conf: WalAcceptorConf, /// assigned application name @@ -58,7 +58,7 @@ impl ReplicationConn { Self { timeline: conn.timeline, stream_in: Some(conn.stream_in), - stream_out: Mutex::new(conn.stream_out), + stream_out: conn.stream_out, conf: conn.conf, appname: None, } @@ -213,8 +213,8 @@ impl ReplicationConn { outbuf.put_u64(get_current_timestamp()); assert!(outbuf.len() + file_buf.len() == msg_size); - // FIXME: combine these two into a single send, - // so that no other traffic can be sent in between them. + // This thread has exclusive access to the TcpStream, so it's fine + // to do this as two separate calls. self.send(&outbuf)?; self.send(&file_buf)?; start_pos += send_size as u64; @@ -230,10 +230,9 @@ impl ReplicationConn { Ok(()) } - /// Unlock the mutex and send bytes on the network. - fn send(&self, buf: &[u8]) -> Result<()> { - let mut writer = self.stream_out.lock().unwrap(); - writer.write_all(buf.as_ref())?; + /// Send messages on the network. + fn send(&mut self, buf: &[u8]) -> Result<()> { + self.stream_out.write_all(buf.as_ref())?; Ok(()) } }