walkeeper replication: remove the lock from the send stream.

I originally thought there would be multiple threads sending here, but
that's not currently the case, so remove the lock.
This commit is contained in:
Eric Seppanen
2021-05-18 14:51:51 -07:00
committed by Eric Seppanen
parent e0146304e6
commit 9fe3b73e13

View File

@@ -16,7 +16,7 @@ use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom, Write};
use std::net::TcpStream;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::{str, thread};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
@@ -45,7 +45,7 @@ pub struct ReplicationConn {
/// `take` it from us.
stream_in: Option<BufReader<TcpStream>>,
/// Postgres connection, output
stream_out: Mutex<TcpStream>,
stream_out: TcpStream,
/// wal acceptor configuration
conf: WalAcceptorConf,
/// assigned application name
@@ -58,7 +58,7 @@ impl ReplicationConn {
Self {
timeline: conn.timeline,
stream_in: Some(conn.stream_in),
stream_out: Mutex::new(conn.stream_out),
stream_out: conn.stream_out,
conf: conn.conf,
appname: None,
}
@@ -213,8 +213,8 @@ impl ReplicationConn {
outbuf.put_u64(get_current_timestamp());
assert!(outbuf.len() + file_buf.len() == msg_size);
// FIXME: combine these two into a single send,
// so that no other traffic can be sent in between them.
// This thread has exclusive access to the TcpStream, so it's fine
// to do this as two separate calls.
self.send(&outbuf)?;
self.send(&file_buf)?;
start_pos += send_size as u64;
@@ -230,10 +230,9 @@ impl ReplicationConn {
Ok(())
}
/// Unlock the mutex and send bytes on the network.
fn send(&self, buf: &[u8]) -> Result<()> {
let mut writer = self.stream_out.lock().unwrap();
writer.write_all(buf.as_ref())?;
/// Send messages on the network.
fn send(&mut self, buf: &[u8]) -> Result<()> {
self.stream_out.write_all(buf.as_ref())?;
Ok(())
}
}