Shutdown socket in ReplicationConn

This commit is contained in:
Konstantin Knizhnik
2021-05-31 18:32:41 +03:00
committed by Stas Kelvich
parent e0cc4dee4f
commit 1aceea1bdd

View File

@@ -5,7 +5,7 @@ use crate::pq_protocol::{BeMessage, FeMessage};
use crate::send_wal::SendWalConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::WalAcceptorConf;
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, Result};
use bytes::{BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE};
@@ -14,9 +14,11 @@ use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom, Write};
use std::net::TcpStream;
use std::net::{Shutdown, TcpStream};
use std::path::Path;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use std::{str, thread};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
@@ -52,6 +54,14 @@ pub struct ReplicationConn {
appname: Option<String>,
}
// Separate thread is reading keepalives from the socket. When main one
// finishes, tell it to get down by shutdowning the socket.
impl Drop for ReplicationConn {
fn drop(&mut self) {
let _res = self.stream_out.shutdown(Shutdown::Both);
}
}
impl ReplicationConn {
/// Create a new `SendWal`, consuming the `Connection`.
pub fn new(conn: SendWalConn) -> Self {
@@ -131,9 +141,15 @@ impl ReplicationConn {
let (mut start_pos, mut stop_pos) = Self::parse_start_stop(&cmd)?;
let wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize;
if wal_seg_size == 0 {
bail!("Can not start replication before connecting to wal_proposer");
let mut wal_seg_size: usize;
loop {
wal_seg_size = self.timeline.get().get_info().server.wal_seg_size as usize;
if wal_seg_size == 0 {
error!("Can not start replication before connecting to wal_proposer");
sleep(Duration::from_secs(1));
} else {
break;
}
}
let (wal_end, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, false);
if start_pos == Lsn(0) {